Orestes: a Time Series Database Backed by Cassandra and Elasticsearch

I used to work at a data analysis startup called Jut. Jut’s vision was to bring all your data together in a single environment. This enabled integrated analysis using our programming language, Juttle. It was challenging because there are many different types of data. Different data types require different models for optimal storage and querying. At the highest level, Jut divided all data into two kingdoms: metrics and events. Today I’ll cover the design and implementation of the metrics side, which was covered by a database named Orestes that we built.

Metrics

Metrics are a useful tool for developers. A metric is a repeated measurement of the same quantity as it varies over time. For instance, if I measure the CPU usage on my server every ten seconds while my application is running, that’s a metric. Metrics are widely used to analyze the performance of software projects. There are tools like statsd and collectd that make them easy to gather. In a large system with a lot of metrics, it is critical to store metric data in a space-efficient way that enables fast queries. The most popular data model for achieving this is the time series data model.

The time series data model figures out what metrics are being measured, and it associates each metric with a list of measurements of that metric. For instance, let’s say I measured the CPU usage on computers A and B at 12:00, 1:00 and 2:00, and I got 10%, 20%, and 30% on computer A; 15%, 25%, and 35% on computer B.

The traditional way of storing data is the relational data model. Storing this metric data with a relational model creates these records:

{metric_name: 'cpu_usage', computer: 'A', time: 12:00, value: 10%}
{metric_name: 'cpu_usage', computer: 'A', time: 1:00, value: 20%}
{metric_name: 'cpu_usage', computer: 'A', time: 2:00, value: 30%}
{metric_name: 'cpu_usage', computer: 'B', time: 12:00, value: 15%} 
{metric_name: 'cpu_usage', computer: 'B', time: 1:00, value: 25%} 
{metric_name: 'cpu_usage', computer: 'B', time: 2:00, value: 35%}

It stores the metric_name and computer values with each data point. Since there’s only one metric_name and two computers, this is a lot of unnecessary duplication. The time series data model eliminates this duplication, storing the data much more efficiently. Here’s how this data looks in the time series data model:

{metric_name: 'cpu_usage', computer: 'A'} ----> [{time: 12:00, value: 10%}, {time: 1:00, value: 20%}, {time: 2:00, value: 30%}]
{metric_name: 'cpu_usage', computer: 'B'} ----> [{time: 12:00, value: 15%}, {time: 1:00, value: 25%}, {time: 2:00, value: 35%}]

Neat! It stores each combination of metric_name and cpu only once, associating each combination with its list of measurements.

Orestes

The Requirements

After experimenting with a few open source time series databases, we concluded that we had to write our own. The open source solutions did not scale to a large number of distinct metrics as well as we required. Also, Jut’s ideal time series database had a few specific needs that we had to design for.

For event data, we were using Elasticsearch, which offers fast and flexible search and analysis features. To present a unified view of data without regard to whether it was metric or event data, our time series database had to have a search API at least as expressive as Elasticsearch’s.

Second, we needed to be able to delete data that was no longer useful. Jut allowed users to set retention policies for their metrics and events. All data points older than the number of days configured in the retention policy had to be deleted every day at midnight, and the user could change the retention policy at any time.

To meet our scaling needs and these features, we designed and implemented our own time series database. I named it Orestes. Orestes combines the speed and scale of Apache Cassandra with the search functionality of Elasticsearch. When Jut was at its peak, Orestes executed high-performance queries on billions of data points across millions of distinct metrics every day. Let’s see how it works!

Cassandra

Orestes uses Cassandra for most of its data storage and retrieval. This is because Cassandra’s data model is easily adapted for time series data. Cassandra divides data into rows. A row is identified by its row key, which is a set of key-value pairs. Each row contains several columns, which are also sets of key-value pairs. A single stored object is represented by the key-value pairs of a column plus the key-value pairs of the row key for the row containing that column. For instance, for our CPU data above, one of the row keys is {metric_name: 'cpu_usage', computer: 'A'}, corresponding to the columns [{time: 12:00, value: 10%}, {time: 1:00, value: 20%}, {time: 2:00, value: 30%}].

For Orestes, we wanted the row key for a given data point to consist of all the fields of the point except time and value, also known as the tags of the point. But there’s a complication: Cassandra requires users to fully specify each table’s attributes when that table is created. Unless you know in advance what all the attributes of all your data will be, there’s no built-in way to tell Cassandra to use all the tags as a row key. Since Jut was building a platform for storage and analysis of arbitrary data, we couldn’t list all the possible fields.

To get around this limitation, Orestes’ tables have just one attribute defining the row key. The value of this attribute for a given data point is a serialized string representation of the tags of that point. For instance, Orestes’ row key for our metric {metric_name: 'cpu_usage', computer: 'A'} is "computer=A,metric_name=cpu_usage": an alphabetized, comma-delimited list of key=value substrings. This enables efficient storage of arbitrary time series data in Cassandra. Querying a row is quick, too: given a row key, Cassandra can read a slice of its row in around ten microseconds on my laptop.

An Orestes Table

Putting it all together, a sample table-creation statement used by Orestes is

CREATE TABLE metrics16688 (attrs varchar, offset int, value double, PRIMARY KEY(attrs, offset)) WITH COMPACT STORAGE
The Table Name

Note that the name of our table, metrics16688, has a strange integer suffix. It turns out that this is what enables us to delete expired data. The suffix represents the number of days since January 1, 1970 for the earliest data point stored in the table. It will always be a multiple of 7, because Orestes creates one table for each week of data.

To delete unwanted data, Orestes just tells Cassandra to drop the table containing it. This means that if you want to delete any data from Orestes, you need to delete the whole week that contains that data. This is an unfortunate restriction, but it was the best way we found of working with Cassandra’s deletion quirkiness.

Deleting from Cassandra

Cassandra ostensibly has a delete-by-query API, but this API is a lie. When you delete data using Cassandra’s delete-by-query API, it really just places tombstones on the data points you “deleted”. These tombstoned points are actually deleted the next time Cassandra performs a compaction on the table containing them. Compaction is part of Cassandra’s self-regulated maintenance, but it occurs at unpredictable intervals and very infrequently on a table that isn’t being written to.

Tombstoned points still take up as much disk space as non-tombstoned points. Keeping them around for an arbitrary amount of time after we’d allegedly deleted them was not an option. Luckily, Cassandra deletes data and frees disk space immediately upon dropping a table, provided you have auto_snapshot: false set in your cassandra.yml configuration file.

By dropping tables whose numeric suffix is too old, we got Cassandra to delete data. We chose a granularity of one week instead of something smaller because Cassandra has scaling problems with many tables. Each table costs around a megabyte of heap overhead, so if you have thousands of tables you’ll run into memory pressure. With one table per week, we can store data for decades.

The Table Fields

The first field in our table is attrs varchar (varchar just means “string”). This is the row key described earlier. For a given data point, it contains a string representing all the tags of that point. Note that attrs is also listed as the first field in the PRIMARY KEY. This makes attrs the partition key for our table, so each distinct attrs field defines a row.

The second field and second component in the primary key is offset int. offset is a number of milliseconds since the beginning of the week covered by the table. There are fewer than 232 milliseconds in a week, so we can safely use an int for this field since we get a new table every week. Since offset is the second column in the primary key, Cassandra uses offset as a clustering column. That means points within a row are sorted according to their offset. Thus each of our rows in Cassandra is stored in chronological order, which is just what we want for time series data. The final field in our table is value double. This is the value measured for a metric. So given a JSON object with a time, a value, and optional tags, Orestes serializes the tags into the attrs string, calculates what week the time corresponds to and the offset within that week, and stores the attrs, offset and value in that week’s table.

The final words of our incantation are WITH COMPACT STORAGE. Compact storage is thoroughly explained here. Long story short, it stores data more efficiently than a table created without it. In exchange, you lose the ability to add or remove columns from the table. Orestes doesn’t need to do that, so we used compact storage. Cassandra’s documentation trash-talks compact storage a bit, but actual users see up to 30x lower space usage with compact storage.

Search

Orestes’ approach to serializing row keys makes it fast to look up a row given a row key, but it makes it slow to filter on individual tags. Let’s say you wanted all the metrics for computer A. The data is not indexed by the computer field, only by the combination of all the tag fields in the single attrs string. The only way to find all the metrics for computer A is to scan every row in Cassandra, parse each attrs into its component key-value pairs, and return the rows where computer is A.

Before Orestes, Jut used a time series database called KairosDB that implemented this approach. KairosDB was quick for the first thousand metrics or so, but it quickly slowed down as we added more metrics. At around 100,000 metrics, it became completely non-responsive. Even if we only asked for a few metrics, KairosDB scanned through everything it had stored to find the ones we wanted. In order to scale Jut to web-scale metric data, we needed a more efficient way of searching our row keys.

Since we were already using Elasticsearch for event data, it was natural to use it here as well. When Orestes imports a metric point, in addition to storing attrs, time and value in Cassandra, it stores the tags as a document in Elasticsearch. Elasticsearch maintains in memory an object called the inverted index that maps each key-value pair to the list of documents in which that key-value pair occurs. This enables Elasticsearch to quickly answer queries such as “get all the row keys for computer A”. Furthermore, it opens up a wide variety of search options, including glob matching, regular expressions, and full text search, that is impossible to do efficiently with Cassandra alone.

Putting it All Together

When a user ran a Juttle program querying certain metrics, the Juttle compiler translated the Juttle filter for those metrics into an Elasticsearch query matching the row keys for those metrics. Orestes sent this query to Elasticsearch to get the matching row keys. Then, Orestes made a select request to Cassandra for each row. It sent the points retrieved from these requests to the Juttle Processing Core, which handled the rest of the Juttle program.

With this architecture, we managed to handle writing up to 15,000 metric data points per second on a single host. We could make efficient, expressive queries on these data points. Despite our technical strength, Jut ended up collapsing on March 28, 2016. But Orestes lives on as an open source project on Github. Check it out!

Special thanks to my colleagues Mike Demmer and Andrew Swan, who contributed to the design and implementation of Orestes

One thought on “Orestes: a Time Series Database Backed by Cassandra and Elasticsearch”

Leave a Reply

Your email address will not be published. Required fields are marked *