Juttle’s Cross Platform Optimization Strategy

Juttle can analyze data that is in any of several data stores. It has processors like reduce, sort, head, and tail that operate over a sequence of data points. For instance, head n emits the first n points it receives and drops the rest. reduce count() increments a counter for each data point and returns the total. The built-in implementations of these processors are written in Node.js.

These Node.js processors are functionally correct, but they are often not the most efficient way to operate on data stored in a given database. For instance, the fastest way to count the records in a SQL table is SELECT COUNT(*) FROM my_table. But to count the records in a table using the Node.js implementation of reduce count(), we’d have to SELECT * from my_table, build a Juttle data point from every record, and perform the count in Javascript. It would be much faster if Juttle knew it could use SELECT COUNT to calculate the result of reduce count(). The Juttle Optimizer is the piece in Juttle’s architecture that turns Juttle programs into efficient queries like this.

The Juttle Compiler

The optimizer is one of the phases in the Juttle Compiler. The Juttle Compiler is a program that transforms Juttle programs into executable Javascript. The first stage of the compiler is the parser, which uses peg.js to turn a Juttle program into an Abstract Syntax Tree (AST). An AST is a Javascript object that contains information about a Juttle program. Here’s an example: for the program

sub my_read() { read elastic -last :hour: }

my_read | head 1

the AST is

{
    "type": "MainModuleDef",
    "name": "main",
    "elements": [
        {
            "type": "SubDef",
            "name": "my_read",
            "elements": [
                {
                    "type": "SequentialGraph",
                    "elements": [
                        {
                            "type": "ReadProc",
                            "name": "read",
                            "adapter": {
                                "type": "Identifier",
                                "name": "elastic"
                            },
                            "options": [
                                {
                                    "type": "ProcOption",
                                    "id": "last",
                                    "expr": {
                                        "type": "DurationLiteral",
                                        "value": "01:00:00.000"
                                    }
                                }
                            ]
                        }
                    ]
                }
            ]
        },
        {
            "type": "SequentialGraph",
            "elements": [
                {
                    "type": "FunctionProc",
                    "op": {
                        "type": "Variable",
                        "name": "my_read"
                    }
                },
                {
                    "type": "SingleArgProc",
                    "name": "head",
                    "arg": {
                        "type": "NumberLiteral",
                        "value": 1
                    }
                }
            ]
        }
    ],
}

Whew! Like our Juttle program, the AST has two top-level elements. The first is a subgraph definition (SubDef) for a subgraph named my_read. my_read consists of a read processor with a few options. The second is a SequentialGraph that invokes my_read and sends the output to the head processor. If Juttle were totally naive, it would read all the points in Elasticsearch from the last hour and send them all to head. head would then throw away all but the first point. It is much faster to ask Elasticsearch for only the first point.

In the early days of Juttle, the optimizer looked at the beginning of each SequentialGraph in the AST for optimizable sequences like read | head. When it found such a sequence, it set special optimization options on the readread used these options to make a more efficient query. For instance, read | head 1 became read with a limit option set to 1. The implementation of read queried Elasticsearch for only one data point when it detected a limit option with the value 1.

This was a good start, but it failed in cases like our example where programs used subgraphs. The FunctionProc AST element only tells us that there is some function call at that point in the program. It doesn’t tell us anything about what that function call does. This isn’t enough information to tell whether the program can be optimized, so our original optimizer had to abort optimization here. On large datasets, subgraphs became too slow to use. Since subgraphs are a critical part of Juttle’s expressivity, we needed to fix this.

The Flowgraph Builder

To enable optimization of subgraph-containing programs, we added a stage to the compiler called the flowgraph builder. The flowgraph builder runs between the parser and the optimizer. It expands user-defined subgraphs in an AST to the built-in processors they correspond to. The flowgraph for our program looks like this:

{
    "built_graph": {
        "nodes": [
            {
                "type": "ReadProc",
                "name": "read",
                "adapter": {
                    "type": "Identifier",
                    "name": "elastic"
                },
                "options": [
                    {
                        "id": "last",
                        "val": {
                            "type": "DurationLiteral",
                            "value": "01:00:00.000",
                        }
                    }
                ],
            },
            {
                "type": "BuiltinProc",
                "name": "head",
                "options": [
                    {
                        "id": "arg",
                        "val": {
                            "type": "NumberLiteral",
                            "value": 1,
                        }
                    }
                ],
            }
        ],
        "now": "2016-03-27T21:23:45.979Z",
        "stats": {
            "imports": 0,
            "subs": 1
        }
    }
}

The subgraph has been taken out of the picture entirely. This is the same flowgraph as the one for the subgraphless program read elastic -last :hour: | head 1. In this format, we can easily analyze and optimize the flowgraph. Let’s see how the optimizer works in this case.

The Optimizer

Each adapter can implement its own optimizer. An adapter’s optimizer translates flowgraph processors into specialized queries for that adapter’s datastore. It does so by implementing functions named optimize_head, optimize_tail, optimize_reduce, or any other processor name prefixed by optimize_. Juttle’s core optimizer has a function called optimize_read that delegates to the appropriate adapter’s optimize_ functions. Let’s check it out:

function optimize_read(source_node, graph) {
    var optimization_info = {};
    var successfully_optimized;
    var first_node = true;
    var adapter = adapters.get(source_node.adapter.name, source_node.adapter.location);
    var next_node = outs[0];
    var outs = graph.node_get_outputs(source_node);

    while (outs && !outs_are_unoptimizable(outs, graph)) {
        if (OPTIMIZATION_FUNCTIONS.hasOwnProperty(next_node.name)) {
            var optimize_func = OPTIMIZATION_FUNCTIONS[next_node.name];
            successfully_optimized = optimize_func(source_node, next_node, graph, adapter, optimization_info);
        } else {
            successfully_optimized = false;
            if (first_node) {
                return utils.optimization_disabled(graph, source_node, 'not_optimizable');
            }
            break;
        }

        first_node = false;
        if (successfully_optimized) {
            outs = graph.node_get_outputs(next_node);
            graph.remove_node(next_node);
            next_node = outs[0];
        } else {
            break;
        }
    }

    graph.node_set_param(source_node, 'optimization_info', optimization_info);
}

optimize_read takes two arguments: the flowgraph element representing the read (which is the first object in our example flowgraph’s built_graph array), and a reference to the flowgraph itself. The flowgraph object has a few convenience methods for accessing properties of its elements.

First, optimize_read allocates an optimization_info object. optimization_info is passed to adapters’ optimize_ functions. Those functions set properties on optimization_info. The modified optimization_info is passed at runtime to the read processor, which uses its properties to build optimized queries.

In addition to setting optimization_info properties, each optimize_ function returns true if optimization is successful. optimize_read stores this information in the successfully_optimized variable.

adapters.get returns a reference to the adapter for a given read. Since our example uses read elastic, adapters.get returns the Elastic adapter.

The node_get_outputs function returns the processors that our read sends points to. In our example, this is just the head processor. outs_are_unoptimizable is a little function that checks for a few cases that we haven’t implemented optimization for, like a read that sends points into multiple branches of a parallel graph. If outs_are_unoptimizable(outs) is false, we check whether we can optimize the processor following the read by looking it up in the OPTIMIZATION_FUNCTIONS object. OPTIMIZATION_FUNCTIONS currently has methods for head, tail, sort, and reduce. Here’s its method for head; the other methods are similar:

var ALLOWED_OPTIONS_HEAD = ['arg'];

function optimize_head(source, head, graph, adapter, optimization_info) {
    if (! graph.node_contains_only_options(head, ALLOWED_OPTIONS_HEAD)) {
        utils.optimization_disabled(graph, source, 'unsupported_head_option');
        return;
    }

    var optimizer = adapter.optimizer;

    if (optimizer && typeof optimizer.optimize_head === 'function') {
        return optimizer.optimize_head(source, head, graph, optimization_info);
    }
}

optimize_head validates the options to see if they’re optimizable. The only optimizable head option is arg. arg is the name for the numeric argument to head (1 in our example). Since our head only has the arg option, we make it past this check. Then, we check whether the adapter has an optimizer with an optimize_head function. If it does, as the Elastic adapter does, we delegate to the adapter’s head optimization implementation.

Here’s the Elastic adapter’s optimize_head method:

optimize_head: function(read, head, graph, optimization_info) {
    if (optimization_info.type && optimization_info.type !== 'head') {
        logger.debug('optimization aborting -- cannot append head optimization to prior', optimization_info.type, 'optimization');
        return false;
    }

    var limit = graph.node_get_option(head, 'arg');

    if (optimization_info.hasOwnProperty('limit')) {
        limit = Math.min(limit, optimization_info.limit);
    }

    optimization_info.type = 'head';
    optimization_info.limit = limit;
    return true;
}

First, we check optimization_info.type. If it’s not set, this head is the first processor after the read. If it’s 'head', this head follows another head that followed the read, as in read | head 2 | head 1. We optimize both of these cases, but not any others. We could optimize programs like read | tail 2 | head 1, but they are fringe enough that we don’t bother yet.

If no other optimization has taken place, we calculate the proper limit to set it as a property on optimization_info. optimize_head returns true to indicate to optimize_read that it successfully optimized the head.

optimize_read then checks to see if it can optimize the next processor. It can do so when it has successfully optimized every processor so far and there are still processors it hasn’t considered. In this case, the loop continues with the modified optimization_info and an updated next_node. This enables adapters to optimize sequences of processors. In our example, we’re at the end of the flowgraph, so there is no next processor, so optimization stops.

The runtime implementation for the Elastic adapter takes the limit off optimization_info and uses it as the size parameter on the Elasticsearch query. Thus, we get only the data point we care about, and we don’t have to pointlessly page through all the rest of the data.

Cool!

So that’s how head optimization works in the Elastic adapter. The Elastic adapter also supports optimizations for tail and reduce with count, count_unique, avg, max, min, sum, or stdev. The SQL adapter supports all those and sort as well. Optimization is also featured in the adapters for Google Analytics, files, http endpoints, and a few others. This optimization framework, where the compiler in the Juttle core delegates to optimization functions implemented by each adapter, has proven to be quite flexible. It sits at the heart of our efforts to make Juttle a speedy language.

Leave a Reply

Your email address will not be published. Required fields are marked *