FlumeBase User Guide

version 0.2.0


Table of Contents

1. Introduction
2. Quick Start
3. Installation
3.1. Prerequisites
3.2. Program installation
4. Configuration
4.1. Server configuration
4.2. Client configuration
5. Architecture
6. The rtsql language
6.1. DDL Commands
6.1.1. CREATE STREAM
6.1.2. DROP STREAM
6.1.3. SHOW STREAMS
6.1.4. SHOW FUNCTIONS
6.1.5. DESCRIBE
6.1.6. EXPLAIN
6.2. SELECT statements
6.2.1. Stream references
6.2.2. WHERE clauses
6.2.3. JOIN clauses
6.2.4. Aggregation
6.2.5. WINDOW clauses
6.2.6. Querying event properties and attributes
6.3. Data types and value ranges
6.3.1. Integral types
6.3.2. The BOOLEAN type
6.3.3. Floating-point types
6.3.4. Precise numeric types
6.3.5. The STRING type
6.3.6. The BINARY type
6.3.7. The TIMESTAMP type
6.3.8. The LIST type
6.3.9. Type coercion
6.3.10. Polymorphic types and type classes
6.3.11. Variable-length function argument lists
7. The FlumeBase server
7.1. Starting a foreground server
7.2. Starting a daemon server
7.3. Stopping the server
8. The FlumeBase shell
8.1. Starting the shell
8.2. Connecting to the execution environment
8.3. Monitoring flows
8.4. Controlling flows
8.5. Controlling the session configuration
8.6. Miscellaneous commands
9. Function Reference
9.1. Event property accessor functions
9.2. Aggregate functions
9.3. Functions that operate on lists
9.4. Functions to work with binary data
9.5. Functions that operate on strings
9.6. Functions that operate on timestamps
9.7. Functions that operate on numbers

1. Introduction

FlumeBase is a database-inspired stream processing system built on top of Flume. This system allows users to dynamically insert queries into a data collection environment and inspect the stream of events being collected by Flume. These queries may spot-check incoming data, or specify persistent monitoring, data transformation, or quality filtering tasks. Queries are written in a SQL-like language called "rtsql."

FlumeBase can present data back to users of an interactive shell environment. It can also be configured to deliver streams of output events back into a Flume network, for consumption by other tools or persistance in HBase, HDFS, or other storage media.

The emphasis of this system is on low-latency analysis of incoming data being captured by Flume. The name "rtsql" (FlumeBase's query language) underscores the real-time nature of the query system, as well as the SQL-based origin of the query language syntax. It is hoped that FlumeBase will allow you to perform useful in-line data transformation or filtering, or time-sensitive alerting or tuning of a broader system, before subjecting the data being captured by Flume to a deeper (but perhaps higher-latency) analysis with other tools such as Hadoop MapReduce.

Warning

FlumeBase is an EXPERIMENTAL system! This is in no way ready for production use. Use this AT YOUR OWN RISK. Connecting this system to production Flume nodes may result in data loss, misconfiguration, or other serious problems.

This document explains how to install and configure the FlumeBase system. It then explains the rtsql language, used to submit queries to the runtime environment, and the commands used to control the terminal client itself. This document is intended for:

  • System administrators
  • Data analysts
  • Data engineers

2. Quick Start

For those who understand Flume, SQL, and want to just see a demo of what can be done with FlumeBase, follow the steps in this section. This is a five minute tour of the FlumeBase world.

First, copy the following text into a file named data.txt.

1,aaron,purple,42
2,bob,blue,11
3,cindy,green,312
    

Install Flume 0.9.3, Hadoop 0.20, and Java 6. If you are running Cloudera's Distribution of Hadoop 3 beta 4 (CDH3B4), you have already installed all of these. Users of older versions of these products will need to upgrade. See Section 3, “Installation” for more thorough installation instructions.

Unzip the FlumeBase installation:

$ tar vzxf flumebase-(version).tar.gz
    

Start the FlumeBase shell:

$ cd flumebase-(version)/
$ bin/flumebase shell
    

By default, FlumeBase is configured with a self-contained environment that embeds the FlumeBase server and Flume itself within the same process as the shell. Now let's define a stream over the file, and query it.

rtsql> CREATE STREAM data(id int, name string, favcolor string,
    -> luckynumber int) FROM LOCAL SOURCE 'tail("/path/to/data.txt")';
CREATE STREAM

rtsql> SELECT * FROM data;
    

You created stream which operates over a local (self-hosted) Flume logical node which reads all the lines from data.txt. You then ran a query that extracts all fields from each event in the stream. Each line of the file corresponds to a different event.

In another terminal, now execute the following:

$ echo 4,dave,orange,611 >> /path/to/data.txt    
    

You should observe that as soon as Flume detects the new record (about a second's delay), it will be passed along to FlumeBase and emitted on your console.

The submitted query has created a "flow," which runs as long as we allow it. If more data were to enter Flume via that file, we would continue to process it. Now, let's cancel that flow:

rtsql> \d 1
    

(As FlumeBase decommissions the internal logical node, there may be an error emitted by Flume itself; this is normal. In general, running in a single process will be "noisy" because of both client and server activity condensed to a single console. For a cleaner session experience, run the server and client in separate processes; see Section 3, “Installation” for instructions.)

And now let's run another query:

rtsql> SELECT favcolor FROM data WHERE luckynumber = 42;    
    

After a few seconds, this flow is initialized with the data in the Flume logical node. Note that we only get one row out from our original data set. If you add more lines to the file which add events where the luckynumber column is 42, you'll see them appear in the FlumeBase console.

This concludes our tour. To quit the FlumeBase shell, run:

rtsql> \q    
    

The remaining sections of this user guide will describe multi-process configuration, the rtsql language, and shell operation in greater detail. Good luck!

3. Installation

3.1. Prerequisites

FlumeBase requires a few prerequisites before it can be run on your machine:

  • Java 6.0
  • Hadoop 0.20
  • Flume 0.9.3

Java can be obtained from http://www.oracle.com/technetwork/java/index.html. The Java 6.0 SE JRE (or JDK) is required. Java downloads and installation instructions can be found on Oracle's web site.

Both other prerequisites can be installed from Cloudera's Distribution for Hadoop, version 3-beta-4 (CDH3b4) or newer. See http://archive.cloudera.com for instructions on downloading and installing Cloudera's Distribution for Hadoop.

While FlumeBase is written in Java and thus should be portable across a wide variety of operating systems, testing has only been performed under a Linux environment. It is likely to work under cygwin and OS X as well, but no guarantees are made.

The following prerequisite knowledge is required to understand this documentation:

  • Basic computer technology and terminology
  • Familiarity with command-line interfaces such as bash
  • Prior understanding of Flume's operation and purpose
  • Prior exposure to SQL is recommended

3.2. Program installation

FlumeBase itself is distributed as a tar file. Install FlumeBase by unzipping the tar file:

$ tar vzxf flumebase-(version).tar.gz      
        

This will expand to a directory called flumebase-(version)/.

4. Configuration

By default, FlumeBase is configured to run in a single process combining both the interactive shell, and the execution engine. Terminating the shell will also terminate the execution environment, including all running queries. This is most useful for evaluating FlumeBase. For more serious use, the execution environment should be run in a persistent process on a server. Clients should be configured to connect to this server, or users should be instructed to explicitly do so.

To enable zero-configuration evaluation of FlumeBase, the FlumeBase process will also host an embedded Flume master node. To interact with existing streaming data sources, this should also be reconfigured to point to an existing Flume deployment.

4.1. Server configuration

Install FlumeBase on a server where the query execution engine should be run. Then edit the etc/flumebase-site.xml file to contain the following values:

Table 1. Configuration settings for FlumeBase servers
PropertyValue
flume.home The path to $FLUME_HOME on your server.
flumebase.remote.port The port where the FlumeBase server listens for clients.
embedded.flume.master This should be set to false if a Flume master is available. A value of true means that the FlumeBase environment acts as its own Flume master, separate from an existing Flume network.
flumebase.flume.master.host The hostname of the foreign Flume master to connect to.
flumebase.flume.master.port The port the foreign Flume master listens on.
flumebase.flume.collector.port.min/max FlumeBase uses Flume collectors to receive data from the broader Flume network. set ...port.min and ...port.max to the range of ports on the FlumeBase server which the FlumeBase daemon may use for this purpose.

A set of environment variables also govern how FlumeBase locates various directories, and how FlumeBase instantiates its JVM. Default values for these can be set in a file named etc/flumebase-env.sh. A template version of this file is available for your convenience. The FlumeBase server will use a pidfile to ensure that multiple instances of the server do not contend for open ports, etc. The pidfile will be placed in the directory identified by $FLUMEBASE_PID_DIR. A FlumeBase server running in the background will write to a log file in the directory identified by $FLUMEBASE_LOG_DIR.

Finally, to run in distributed mode, the Flume master node needs to register the FlumeBase plugin. You should copy the flumebase-(version).jar file from the FlumeBase installation into /usr/lib/flume/lib on the Flume master machine. Then edit flume-site.xml on the master to include the setting:

<property>
  <name>flume.plugin.classes</name>
  <value>com.odiago.flumebase.flume.FlumePlugin</value>
</property>
        

You may need to restart the Flume master process for this to take effect.

To learn how to start the server, see Section 7, “The FlumeBase server”.

4.2. Client configuration

Install a copy of FlumeBase on every client machine where users intend to submit queries to the FlumeBase system. The client must be able to open a TCP connection to the FlumeBase server. In order to view output events on the FlumeBase console, the server must be able to open a TCP connection back to the client.

Set the following settings in etc/flumebase-site.xml on the client machine:

Table 2. Configuration settings for FlumeBase clients
PropertyValue
flume.home The path to $FLUME_HOME on the client.
flumebase.autoconnect The host:port of the FlumeBase server to connect to. If set to local, this will use an in-process server. If set to none, the user must explicitly open a server connection with \open in the console.
flumebase.flow.autowatch Defaults to true; this boolean property specifies whether you want every query to automatically send its output to the console when submitted. If false, you must explicitly watch flow output with the \watch command.
flumebase.console.port FlumeBase uses a Thrift RPC connection to relay query output back to the client. The client listens on the port specified by this property.

5. Architecture

The FlumeBase system is composed of a command-line client, a server called the "execution environment," and the Flume system that collects and transports data. These may be configured as separate, distributed processes, or collocated on a single machine, or in a single process.

The command-line client is the simplest component in the product. This process is run directly by a user (perhaps on a server, but more often her own desktop or laptop). This connects to the execution environment. The client provides the user with a prompt, where new queries or control statements may be entered.

Each query (i.e., SELECT statement) produces a flow in the execution environment. The user may subscribe to running flows (this is done automatically for new flows created by the user). When a subscribed flow emits an output event, its text is printed to the client terminal.

Closing the client does not terminate any submitted flows. These are running in the execution environment, a separate long-lived process which may be shared by multiple users. An execution environment holds the definitions of all streams (created by CREATE STREAM statements), and processes the running flows. The execution environment is typically run on a dedicated server. For evaluation purposes, it may also be hosted inside the same process as a command-line client. (When the execution environment is embedded in the client, terminating the client will terminate all running flows, and discard knowledge of any streams.)

Submitted queries (flows) allow computation over streams of data. Streams are defined as a set of events, which are roughly analogous to "records" in a table-based SQL environment. These events are directly linked to "events" in Flume. Users define streams before querying them; these definitions specify the fields within each event, how to parse the event body into the fields, and where the stream originates. Each flow is itself a stream; its output is also a series of events, based on the computations specified by the user and the set of input events the flow operates over.

By default, queries submitted by users result in anonymous flows, which deliver their outputs only to the subscribed client instances. These flows continue to operate while no users are subscribed, but output events generated when no users are subscribed are simply dropped (there is no way to retrieve them later).

Users can bind a name to a running flow (or do so when submitting a flow with the CREATE STREAM AS SELECT syntax). This name is used as the name for a Flume logical node, which broadcasts the output of the flow as a set of Avro-encoded events. Users may then use the Flume shell to configure this logical node to direct a copy of its output to a monitoring application, persistent storage (such as HDFS), or elsewhere. Section 6.1.1.5, “CREATE STREAM AS SELECT describes the CREATE STREAM AS SELECT syntax and its effects in greater detail. Section 8.4, “Controlling flows” describes how to manipulate flow names.

FlumeBase reads from a Flume network by modifying the sink definitions of nodes specified with CREATE STREAM statements. When a logical node is identified as a stream source, its sink definition is rewritten as a fan-out sink containing its original sink, and a new agent sink which forwards the node's output to a collector source hosted within the FlumeBase execution environment. (The FlumeBase execution environment will host an embedded Flume physical node, which then hosts logical nodes as necessary to receive and transmit streams of events.) When a stream is gracefully dropped (by using DROP STREAM to drop the stream, or \shutdown! to shutdown the execution environment), the original logical node definition is restored to the logical node which provided the data stream.

Interaction between a FlumeBase execution environment and Flume is performed via the Flume master node's thrift interface. The physical node hosted within an execution environment is controlled by the Flume master node, and is for all intents and purposes, an ordinary Flume node. For this reason, flows may take a few seconds to initialize (or cancel), as they are dependent on Flume for aspects of their configuration. Once initialized, flows should operate on events with low latency. If no external Flume network is available, you can configure the Flume execution environment to host an embedded Flume master node, for evaluation or single-machine computation purposes.

6. The rtsql language

Users interact with FlumeBase by submitting commands and queries written in a language called rtsql. The rtsql language is designed to allow on-going analysis of incoming data. The language is similar to SQL:2003; its syntax will be largely familiar to SQL experts. It also provides SQL:2003-style windowed operators which allow joining and aggregation over bounded amounts of time.

In rtsql, all data is consumed through streams. The FlumeBase architecture assumes that these streams cannot be replayed, and may be of infinite length. Therefore, all operators such as GROUP BY which can use “all the rows” as input are restricted so that they can only use windowed views into the stream. rtsql does allow a stream to be defined over a file. A SELECT statement querying such a stream will read the data in-order in the file and then terminate when it reaches the end of the file, but rtsql does not currently have special provisions for working with these data sources in a different fashion than Flume-based sources.

Keywords in rtsql are case-insensitive. Identifiers (stream, column, function names, etc) are translated to lower-case for their canonical representation, unless they are "double-quoted" in which case they are interpreted literally.

6.1. DDL Commands

6.1.1. CREATE STREAM

The CREATE STREAM statement will create a stream definition which may be used in subsequent statements such as SELECT.

CREATE STREAM stream_name (col_name data_type [, ...])
    FROM [LOCAL] {FILE | NODE | SOURCE} input_spec
    [EVENT FORMAT format_spec [PROPERTIES (key = val, …)]]

CREATE STREAM stream_name AS select_statement

data_type ::= BOOLEAN | BINARY | BIGINT | INT | FLOAT | DOUBLE | PRECISE(int) | STRING | TIMESTAMP 
format_spec ::= 'delimited' | 'regex' | 'avro'
        

Section 6.3, “Data types and value ranges” describes the rtsql data types in greater detail.

input_spec is a 'single-quoted-string' identifying the filename / Flume logical node / Flume source specification to use as the input for this stream.

File names are Hadoop Path objects; they may specify the complete URI to a file, using any protocol permitted by the Hadoop common library. e.g.:

rtsql> CREATE STREAM foo (x STRING) FROM FILE
    -> 'hdfs://nn.example.com/user/aaron/foo.txt';
          

Unqualified file names are interpreted relative to the value of the fs.default.name configuration parameter. For example, if this were set to 'hdfs://nn.example.com', the following definition would be equivalent to the previous one:

rtsql> CREATE STREAM foo (x STRING) FROM FILE '/user/aaron/foo.txt';
          

Using the LOCAL keyword will cause the source definition to be interpreted relative to the local filesystem of the FlumeBase server. The following two statements are equivalent:

rtsql> CREATE STREAM foo (x STRING) FROM LOCAL FILE '/home/aaron/foo.txt';
rtsql> CREATE STREAM foo (x STRING) FROM FILE 'file:///home/aaron/foo.txt';
          

Note that if the FlumeBase server is on a different machine than the client, this will read from /home/aaron/foo.txt on the FlumeBase server -- not the client.

The EVENT FORMAT clause specifies how the bytes inside an event should be interpreted. By default, rtsql uses the delimited event format. Events are assumed to contain UTF-8 text representations of each field, separated by commas.

By specifying an EVENT FORMAT, you can choose which parser to apply to each event. The event format is specified as a 'quoted string'. The next few subsections define the available event formats.

You can further control the behavior of the event parser by specifying (key, value) pairs in the PROPERTIES section. The keys recognized are specific to each event format. Keys and values are both single-quoted strings.

6.1.1.1. Designated timestamp columns

When reading a stream from a file, there is no Flume timestamp to associate with each event. By default, FlumeBase will associate the current system timestamp as it reads each line of the file with the event generated for that line. This can be overridden by specifying the timestamp.col property in the PROPERTIES section of the CREATE STREAM statement. The timestamp.col must refer to a column of type TIMESTAMP. If the timestamp value for an event is null, the current system timestamp will be used instead.

6.1.1.2. The delimited event format

The (default) delimited event format allows FlumeBase to interpret events consisting of UTF-8 encoded text. Individual fields are expected to be separated by commas. All values are expected to be converted to text. BINARY columns are created as the bytes holding a UTF-8 encoded string (which was terminated by the field delimiter).

The delimiter character is controlled by the delimiter property. You may set this to any other character; for example, a pipe character:

  rtsql> CREATE STREAM x(a int, b int) FROM LOCAL FILE 'foo.txt'
      -> EVENT FORMAT 'delimited' PROPERTIES ('delimiter' = '|');
            

Nullable integer, timestamp, etc. fields are regarded as null if the field is an empty string (i.e., two delimiters occur in a row). A column of type STRING of zero length will be an empty string. NULL string values are, by default, indicated by the sequence \N. This sequence can be overridden by any other string with the null.sequence property.

Columns which are lists of other values (that is, columns with type LIST<t> where t is some other type, such as INT) contain an additional list delimiter, which separates the values within each list. The list delimiter is controlled by the list.delim property. The default value for list.delim is the pipe ("|") character.

6.1.1.3. The avro event format

FlumeBase can interpret events which contain a single serialized avro record as a collection of fields. The event is assumed to be in the Avro binary encoding format. You must specify the schema property to describe the expected encoding schema. (This is in addition to the normal column definition section of the CREATE STREAM statement.) The schema is expected to be a single Avro record (with any name) which contains a set of fields; these fields must have the correct avro types ("string", "long", etc.) to match the expected rtsql types (STRING NOT NULL, BIGINT NOT NULL, etc.). A nullable type (e.g., STRING) is expressed as an avro union of ["string", "null"].

6.1.1.4. The regex event format

Another text-based event format, this format allows you to specify a regular expression, the groups of which are extracted as the columns. Each event is a single line of UTF-8 encoded text. The regex property is required. This should define as many binding groups (with (parentheses)) as columns are specified in the stream definition. The null.sequence and list.delim properties apply to this format as well.

6.1.1.5. CREATE STREAM AS SELECT

One of the most powerful uses of rtsql is as an inline processor of Flume events. The output of a FlumeBase flow can be used as a Flume source for further downstream processing or data collection. Named streams, defined by CREATE STREAM AS SELECT will cause the FlumeBase execution environment to host a Flume logical node with the same name as the stream name. This logical node will deliver to its sink all output events of the flow. The events will be in binary-encoded Avro format; a record with the same name as the stream, with field names equal to the display names of each select expression.

By default, the null sink is used for the logical node created by this syntax. You should use the Flume shell to reconfigure the logical node to deliver this output to other required sinks.

6.1.2. DROP STREAM

The DROP STREAM statement removes a stream definition created by CREATE STREAM.

DROP STREAM stream_name
        

When dropping a stream created in terms of a flow (CREATE STREAM AS SELECT), this will decommission the Flume logical node and drop the stream identifier, but will not cancel the flow itself. See Section 8.4, “Controlling flows” for more information on how to cancel the flow itself.

6.1.3. SHOW STREAMS

The SHOW STREAMS statement shows the definitions of all streams.

6.1.4. SHOW FUNCTIONS

The SHOW FUNCTIONS statement shows the definitions of all functions which may be applied to expressions in a statement. The output of this command is a list of functions and their types. Types are written in the form ((input_types) -> output_type).

rtsql> SHOW FUNCTIONS;
length ((STRING) -> INT)
...
          

The length function may take a STRING or NULL value, and returns an INT (or NULL, if the input was NULL).

Some functions are polymorphic -- their input types are flexible, subject to certain constraints, and their output types may or may not match their input types. For example, the sum function can operate over any numeric type:

rtsql> SHOW FUNCTIONS;
sum ((var('a, constraints={TYPECLASS_NUMERIC})) -> var('a, constraints={TYPECLASS_NUMERIC}))
...
          

The input argument’s type is var('a, constraints={ TYPECLASS_NUMERIC}). This is a type variable with the name 'a (pronounced "alpha"), and can take any type subject to the constraint that it is in the typeclass "numeric" -- that is, it is one of INT, BIGINT, FLOAT, or DOUBLE. It is an error to take the sum of a STRING or BOOLEAN column.

The output argument is the same type variable "alpha;" whatever type is used for the input, will also be used as the output type. For more information on polymorphic types, see Section 6.3.10, “Polymorphic types and type classes”.

A complete list of functions included with rtsql is provided in Section 9, “Function Reference”.

6.1.5. DESCRIBE

The DESCRIBE statement shows the definition of a single object in rtsql:

DESCRIBE identifier        
        

This may be used to inspect a single stream, function, or other entity present in the symbol table.

The following statement displays the argument and return types for the length function:

rtsql> DESCRIBE length;
length ((STRING) -> INT)
          

6.1.6. EXPLAIN

The EXPLAIN statement shows the execution plan for an rtsql statement:

EXPLAIN statement        
        

This may be used to inspect the operation of any rtsql statement. The output of the command is a text description of how the statement was parsed (in a tree-based representation), followed by a control-flow graph of the steps applied in the runtime environment to satisfy the query.

rtsql> EXPLAIN SELECT x FROM foo;        
        

6.2. SELECT statements

The SELECT statement returns an event stream computed in terms of one or more existing event streams.

select_statement ::= SELECT select_expr, select_expr ... FROM stream_reference
    [ JOIN stream_reference ON join_expr OVER range_expr, JOIN ... ]
    [ WHERE where_condition ]
    [ GROUP BY column_list ]
    [ OVER range_expr ]
    [ HAVING having_condition ]
    [ WINDOW window_name AS ( range_expr ), WINDOW ... ]
      

A simple SELECT statement can return all events in a stream:

rtsql> SELECT * FROM foo;
        

It can also return only a specific subset of fields from the underlying stream:

rtsql> SELECT a, b, d FROM foo;
        

In addition to referencing specific fields, mathematical expressions may be calculated as well:

rtsql> SELECT 2 * a + 3 FROM foo;
        

The following table lists all available operators. Operators at one level of the table have higher priority than operators in a lower row of the table. Operators of the same priority are applied left-to-right. Parentheses can be used to override precedence. (This is the same precedence order as uesd by Java, for the subset of Java operators supported by rtsql.)

Table 3. Operator precedence rules in rtsql
Operator classoperators
unary null operators: IS NULL, IS NOT NULL
unary operators: + - NOT
multiplicative: * / %
additive: + -
comparison: > < >= <=
equality: = !=
logical conjunction: AND
logical disjunction: OR
function call: f(e1, e2, e3...)
identifiers and constants:x 42 'hello!'

Each selected expression may have an alias associated with it:

rtsql> SELECT 2 * a AS doubled FROM foo;
        

The AS keyword itself is optional.

This is specifically useful in the context of nested SELECT statements:

rtsql> SELECT doubled FROM (SELECT 2 * a AS doubled FROM foo)
    -> AS q WHERE doubled > 4;
        

rtsql does not support the DISTINCT or ALL keywords; every query is implicitly "SELECT ALL."

6.2.1. Stream references

stream_reference ::= (stream_name | select_statement) [[AS] ref_name]
        

The stream_reference in a SELECT statement may literally identify a stream:

rtsql> CREATE STREAM foo (x string) FROM ...;
CREATE STREAM
rtsql> SELECT * FROM foo;
...
          

You may also qualify column names with their stream name:

rtsql> SELECT foo.x FROM foo;
          

And you may provide a reference name (ref_name) that is different than the stream name:

rtsql> SELECT v.x FROM verylongname AS v;
          

The AS keyword is optional. This is equivalent to:

rtsql> SELECT v.x FROM verylongname v;
          

A stream_reference may also be a nested SELECT statement.

rtsql> SELECT LENGTH(x) FROM (SELECT x FROM foo) AS f;
          

Each nested SELECT statement must be given a ref_name alias (f in the previous example). You do not need to qualify individual column names with the ref_name unless the column name would otherwise be ambiguous (e.g., if two sources are joined, and they each contain a column named x, then all references to x must be qualified with the source ref_name).

6.2.2. WHERE clauses

where_clause ::= WHERE bool_expr
        

A SELECT statement may filter some input events, and emit output events corresponding only to input events that match a boolean predicate.

rtsql> SELECT x FROM foo WHERE LENGTH(x) > 5;
          

This may be a compound boolean expression (using the AND and OR operators). rtsql does not support the IN or EXISTS operators. Subqueries are also not permitted in a WHERE clause.

6.2.3. JOIN clauses

join_clause ::= JOIN stream_reference ON join_expr OVER range_expr
        

A SELECT statement may correlate events from multiple sources and operate on their joined representation. In table-based SQL systems, any row of one table may be joined with any row of another table in a JOIN clause. Since FlumeBase operates over potentially infinite streams of data, this model would not scale. Instead, JOIN clauses require a window clause which defines the time-based boundaries within which a join may occur.

The only join expression supported is an equi-join; the join_expr must use the equality operator (=) to relate one field of each of the two joined streams.

The range_expr specifies the time range for the dependent (right) stream in which events may be joined to a given event of the primary (left) stream.

range_expr ::= RANGE INTERVAL expr time_scale PRECEDING
    | BETWEEN INTERVAL expr time_scale PRECEDING AND INTERVAL expr time_scale FOLLOWING

time_scale ::= SECONDS | MINUTES | HOURS | DAYS | WEEKS | MONTHS | YEARS
        

Consider the following example:

rtsql> SELECT * FROM f JOIN g ON f.x = g.y
    -> OVER RANGE INTERVAL 5 SECONDS PRECEDING;
          

This specifies that for each event seen in f (the primary stream), it may be joined with any events in g (the dependent stream) which occurred up to five seconds before the event in f.

The opposite time relation holds from the perspective of events in g: for each event in stream g, it may be joined with any events in f which occur up to five seconds later.

rtsql> SELECT * FROM f JOIN g ON f.x = g.y OVER
    -> RANGE BETWEEN INTERVAL 1 SECONDS PRECEDING
    -> AND INTERVAL 5 SECONDS FOLLOWING;
          

This example specifies that each event in f may be joined with any matching events in g which occured up to one second before, or five seconds after the event in f.

Only inner joins are supported at present. The INNER, OUTER, NATURAL, LEFT, RIGHT, and FULL keywords are not (yet) supported by rtsql.

6.2.4. Aggregation

Aggregate operators may be used in rtsql in a similar manner to ordinary SQL systems.

group_by_clause ::= GROUP BY col [, col...]
over_clause ::= OVER range_expr
having_clause ::= [ HAVING bool_expr ]
        

If an aggregate function (e.g., sum, count) is used in a SELECT statement, the statement must have an over_clause which defines the time window in which aggregate operators work. Aggregation over "the entire stream" is not supported.

The range_expr syntax is given in Section 6.2.3, “JOIN clauses”.

The following example provides a count of the number of events observed over a rolling five second window:

rtsql> SELECT COUNT(*) FROM foo OVER RANGE INTERVAL 5 SECONDS PRECEDING;
          

This may be further refined with a group_by_clause. For example:

rtsql> SELECT COUNT(*) FROM foo GROUP BY event_src
    -> OVER RANGE INTERVAL 5 SECONDS PRECEDING;
          

FlumeBase uses bucketing to support rolling time windows. By default, 100 buckets are used. So the previous two examples will support rolling counts with a "step" size of 50 milliseconds (5 seconds / 100 buckets).

You can specify the number of buckets by setting the flumebase.aggregation.buckets key in the session configuration (See Section 8.5, “Controlling the session configuration”). A larger number of buckets allows finer granularity in rolling windows, but may increase memory usage.

By default, windowed aggregation operators will emit output groups only when the corresponding input buckets contained data. For example, if data arrives at t=100 and t=150, and buckets are 10 ms wide, an output group will be emitted with timestamp t=100 and another one for t=150, but no intermediate counts or other aggregates will be generated for t=110, t=120, etc. This behavior can be configured by setting flumebase.aggregation.continuous.output in the session configuration. Setting this flag to true will cause the aggregation operator to emit output groups for all time intervals in which any data is available. This may prove more useful for building time-series graphs, etc, but less so for working with sporadic incoming data.

Note that by setting flumebase.aggregation.buckets to 1 and flumebase.aggregation.continuous.output to true, you can disable rolling windows, and instead divide the stream into discrete time-based groups.

This example gives a minute-by-minute summary of hits per minute from a web log:

rtsql> \set flumebase.aggregation.buckets=1;
rtsql> \set flumebase.aggregation.continuous.output=true;
rtsql> SELECT COUNT(*) as hits FROM httpd_log
    -> OVER RANGE INTERVAL 1 MINUTES PRECEDING;
          

Flume may deliver events out of order; FlumeBase tolerates improperly ordered events that arrive within the "slack interval" of 200 milliseconds of when they are expected. Events that arrive after the slack interval may be excluded from aggregate functions. The slack interval may be configured by setting flumebase.slack.time to a different integer number of milliseconds in the session configuration. (See Section 8.5, “Controlling the session configuration”.)

The set of aggregate functions available in rtsql are described in Section 9.2, “Aggregate functions”.

rtsql does not support full SQL:2003 windowed operators; a single range for the entire select statement is applied by the over_clause to all aggregate operators. This may change in a future version of FlumeBase.

6.2.4.1. Filtering aggregate data
having_clause ::= [ HAVING bool_expr ]         
          

A WHERE clause is evaluated before any grouping operators are applied. By contrast, the HAVING clause is applied after all grouping and projection operators (e.g., x AS y) have been applied. A HAVING clause filters events at the end of processing, emitting only those output events that satisfy the boolean predicate.

For example, this statement filters the previous example such that only event sources that emit 10 or more records in a five second window are listed as output:

rtsql> SELECT event_src, COUNT(event_src) AS cnt FROM foo
    -> GROUP BY event_src
    -> OVER RANGE INTERVAL 5 SECONDS PRECEDING
    -> HAVING cnt > 10;
            

6.2.5. WINDOW clauses

window_clause ::= WINDOW window_name AS ( range_expr ), WINDOW ...
        

Windows may be defined with a window clause at the end of a SELECT statement. A window_name may be substituted for a range_expr anywhere in the SELECT statement:

rtsql> SELECT COUNT(*) FROM foo OVER mywin
    -> WINDOW mywin AS (RANGE INTERVAL 10 SECONDS PRECEDING);
          

6.2.6. Querying event properties and attributes

The columns of a stream event are extracted from the event body. In Flume, events have additional properties (the originating host, priority, and timestamp). Each event may also be decorated with additional arbitrary named attributes.

Named attributes of an event can be accessed with the syntax "#attrname". This acts like a column of type BINARY.

For example, to select events with the interesting attribute:

rtsql> SELECT * FROM foo WHERE #interesting IS NOT NULL;
          

Event attributes are defined as a STRING key and a BINARY value. To use these values as strings, use the BIN2STR() function:

rtsql> SELECT * FROM foo WHERE BIN2STR(#x) = 'abc';
          

A set of functions allowing you to access the host, priority, and timestamp properties of each event are described in Section 9.1, “Event property accessor functions”.

For example, to select events only at the ERROR priority level:

rtsql> SELECT * FROM foo WHERE PRIORITY() = 'ERROR';
          

The priority field is also available as an integer. More urgent priorities have lower ordinal values ('FATAL' is 0). To select events at the WARN level and more urgent:

rtsql> SELECT * FROM foo WHERE PRIORITY_LEVEL() <= 2;
          

6.3. Data types and value ranges

Several data types are defined which can hold values of differing ranges. These column types do not follow ANSI SQL names; they are stored in underlying Java types and have the following ranges:

Table 4. rtsql Types and Ranges
rtsql typeUnderlying Java typeRange
BOOLEANBooleantrue, false
BINARYByteBuffer Any array of bytes
BIGINTLong [-263, 263-1]
INTInteger [-231, 231-1]
FLOATFloat [2-149, (2-2-23)*2127] (positive or negative)
DOUBLEDouble [2-1074, (2-2-52)*21023] (positive or negative)
PRECISE(n)BigDecimal A BigDecimal with the scale property set to n.
STRINGString A UTF-8-encoded string
TIMESTAMP(internal) (See Section 6.3.7, “The TIMESTAMP type”)
LIST<t>List (See Section 6.3.8, “The LIST type”)

Like all keywords in rtsql, type names are case-insensitive.

6.3.1. Integral types

The BIGINT and INT types store integer values in 64- and 32-bit values, accordingly. The text representation of these values (e.g., in the delimited event format) is a base-10 integer.

6.3.2. The BOOLEAN type

The BOOLEAN type holds true/false values only. It cannot be coerced to or from an integer type. The text representation of these values are the UTF-8 strings true and false.

6.3.3. Floating-point types

The FLOAT and DOUBLE types hold floating-point values. Operations on floating-point values may be imprecise, subject to the constraints of the IEEE floating-point format standard. All string values parsed by java.lang.Float.valueOf() and java.lang.Double.valueOf() may be used as the text representation of these values.

6.3.4. Precise numeric types

The PRECISE() type constructor defines a family of numeric types providing specific degrees of precision. The argument to the type constructor specifies the "scale" of the value; if positive, this is the number of significant digits to the right of the decimal place. If 0, all digits are significant. Operations on PRECISE values preserve the scale of the value.

6.3.5. The STRING type

The STRING type will hold a UTF-8 encoded character string of arbitrary length. (Events in Flume have a maximum length, which defaults to 32 KB. The contents of all columns together in an event is subject to this limit.)

The empty string is a legal value in rtsql. In delimited events, a NULL string is denoted by a configurable escape sequence which defaults to the two characters \N. Delimited events may not contain the delimiter character itself; the ability to escape these characters is future work.

6.3.6. The BINARY type

The BINARY type holds a byte buffer of arbitrary bytes. When reading from an Avro input source, any byte array can be specified. When reading from text-based inputs, the byte array will be the bytes representing the UTF-8 encoding of the string input.

If coerced to the STRING type (either implicitly, or explicitly through the BIN2STR() function, the UTF-8 character set will be applied to the bytes.

The STR2BIN() function will do the reverse, returning a BINARY object that explicitly represents the UTF-8 bytes of its input string argument.

Descriptions of functions that manipulate binary data are available in Section 9.4, “Functions to work with binary data”.

6.3.7. The TIMESTAMP type

rtsql employs a notion of a TIMESTAMP distinct from other numerical values. A TIMESTAMP contains two fields; a milliseconds part, and a nanoseconds part. The milliseconds part holds the number of milliseconds since the UNIX epoch (00:00:00 UTC, Jan 1, 1970). The nanoseconds part holds the number of nanoseconds after the specified millisecond and should not exceed 1,000,000.

Despite this internal precision, the delimited event format parses timestamps as a single 64-bit base-10 integer corresponding to the milliseconds part, and the nanoseconds part is 0. Furthermore, FlumeBase's internal notion of time and event-ordering works only at the granularity of milliseconds. The Avro event format allows the nanoseconds part to be specified. Even in this case, a column specified by timestamp.col (see Section 6.1.1.1, “Designated timestamp columns”) will associate only the milliseconds part of a timestamp with the event itself.

6.3.8. The LIST type

The LIST type allows you to represent a list of values as the value for a single column. The values in a list may be interacted with as a group, or individual values may be extracted from the list.

Values in a list must all have the same type. This type is specified with the syntax: LIST<t>, where t is the specification of another type. For example: LIST<INT>, or LIST<STRING NOT NULL>. You may not specify LIST without specifying the types of the members of the list.

Any type may appear in the parameter to the LIST type constructor. It is legal to specify, for example, LIST<LIST<INT NOT NULL>>. Note that LIST<INT> NOT NULL is different from LIST<INT NOT NULL> and LIST<INT NOT NULL> NOT NULL -- though all three of these are legal.

Lists can be parsed from events with the delimited event format; their elements are separated by pipe characters ("|"). You can override this by specifying the list.delim property of the event format.

Several functions exist to construct and manipulate lists; a reference is provided in Section 9.3, “Functions that operate on lists”.

6.3.9. Type coercion

Values of some types may be coerced to act as another type. This is performed automatically as necessary; no explicit type-casting operators are used (nor are they provided by the language). Coercion may only occur when moving to a "broader" type; this is called promotion in rtsql. The value itself may be modified to conform to the specific domain of the target type, but this is only performed if the modification would not lose data.

For example, an expression adding an INT value with a BIGINT value will provide a BIGINT result. The INT argument will be promoted to BIGINT and then added to the other BIGINT. Similarly, concatenating a STRING and an INT will coerce the integer into a string representation, and then concatenate it with the string.

Numeric values may be promoted to any broader numeric type. From narrowest to broadest, the numeric types are INT, BIGINT, FLOAT, and DOUBLE. PRECISE types may be promoted to any broader PRECISE type. INT and BIGINT promote to PRECISE(0), FLOAT promotes to PRECISE(24), and DOUBLE promotes to PRECISE(53).

All scalar types may be promoted to STRING. The result of coercing a value to STRING is the string representation of the value, as defined in the previous subsections.

Any type X NOT NULL may be promoted to its nullable counterpart.

A type LIST<X> may promote to LIST<Y> if X promotes to Y.

6.3.10. Polymorphic types and type classes

Some functions may operate over a variety of types. For example, the sum function will add values of any numeric type. Since DOUBLE is the widest numeric type available, the type of the argument column for sum could be marked as DOUBLE. But that would promote every input to sum to a DOUBLE value, and the output would also be DOUBLE. This is not the case; using the DESCRIBE statement, we can see the type of the sum function is:

rtsql> DESCRIBE sum;
sum ((var('a, constraints={TYPECLASS_NUMERIC})) -> var('a, constraints={TYPECLASS_NUMERIC}))
          

In order to allow the sum function to accept a variety of input types, its input type is specified as a type variable with a universal type; this is denoted by the var('a) type. The name of this type variable is 'a and pronounced "alpha." Type variables are bound to concrete types (e.g., INT) when the rtsql statement is compiled. The same type variable specifies the argument and return types of the sum; this way, it will return the same type as it receives as input. (This can be verified by observing that 'a is also the name of the output type.)

The sum function cannot sum a set of strings, however. Nor can it sum boolean values. Type variables may be specified with a set of constraints; the type it is bound to must conform to these constraints. The sum function can operate only over numeric values. To allow this constraint specification, rtsql provides a set of type classes which are sets of types. Type classes are not themselves concrete types; a column cannot be specified with a type class. The set of type classes are listed in Table 5, “Type classes and constraints in rtsql”.

In a function with multiple arguments, the same type variable may be used to specify one or more argument types, in addition to its return type. The type variable will take only one concrete type. Consider a (hypothetical) function mul(x, y) which returns the product of its arguments. The same type variable will be used for both arguments and the return type. It would be constrained to TYPECLASS_NUMERIC. If mul() were called on an INT and a BIGINT, the type variable would be bound to the narrowest concrete type which satisfies all arguments; in this case, BIGINT. Only the concrete types of the arguments are considered when binding a type variable; the context of the return type of the function is not considered.
Table 5. Type classes and constraints in rtsql
Type classConcrete types included Example use
TYPECLASS_NUMERIC INT, BIGINT, FLOAT, DOUBLE Input to and output of the sum function
TYPECLASS_COMPARABLE All numeric types, STRING, and BOOLEAN The max and min functions
TYPECLASS_ANY All concrete types The count function
(any typeclass) NOT NULL Each typeclass may be further specified as NOT NULL.

6.3.11. Variable-length function argument lists

Some functions allow a variable-length argument list. They may take a number of required arguments, and may then have a list of arguments of arbitrary length. The to_list() function, for example, will construct a list out of all its arguments. The following selects an empty list:

rtsql> select to_list() from x;
timestamp      to_list()
1306906081936  []
          

Whereas the following will construct a list of 3 elements:

rtsql> select to_list(42,211,312) from x;
timestamp        to_list(42, 211, 312)
1306906138072    [42, 211, 312]
          

Variable-length argument arrays are denoted by an ellipse (...) in the type signature of the function. For example:

rtsql> describe to_list;
to_list ((var('a, constraints={TYPECLASS_ANY})...) -> LIST<var('a, constraints={TYPECLASS_ANY})> NOT NULL)
          

7. The FlumeBase server

The FlumeBase server allows a server process to handle the execution of queries without requiring a long-lived shell session. FlumeBase shell instances can connect to local or remote server instances to submit queries. Before running a FlumeBase server, you should read Section 4.1, “Server configuration”.

7.1. Starting a foreground server

The FlumeBase server can be started as a foreground process by running bin/flumebase server from the directory where FlumeBase was installed.

7.2. Starting a daemon server

A FlumeBase server can also be run in the background, by running bin/flumebase start. Log output will be captured in a log file in $FLUMEBASE_LOG_DIR (default: $FLUMEBASE_HOME/logs). If the server is acting problematic, running bin/flumebase start -debug will enable debug logging.

7.3. Stopping the server

A running server may be stopped from within the FlumeBase shell. To learn how, see Section 8.2, “Connecting to the execution environment”.

To shutdown the server from the command line, run bin/flumebase stop. This may take a few seconds, as FlumeBase will send a set of instructions to the Flume master to restore its flows to their original state, and "unplug" the running queries. A hard kill (e.g., kill -9) of the FlumeBase server is not recommended.

8. The FlumeBase shell

The FlumeBase shell allows users to interact with the FlumeBase environment. The default FlumeBase configuration connects to a single-threaded execution environment within the same process as the shell. You may also connect to a remote execution environment running in another process (on the same or a different machine).

The FlumeBase shell can be used to transmit rtsql statements defined in Section 6, “The rtsql language” to the execution environment. Several control commands are also defined which allow users to interact with the shell or the environment itself.

8.1. Starting the shell

To start the FlumeBase shell, run bin/flumebase shell from the directory where FlumeBase is installed.

8.2. Connecting to the execution environment

As mentioned, FlumeBase's default configuration file causes an automatic connection to the local environment. You can connect to a remote environment with the command:

rtsql> \open server [port]
        

You can connect to the local (self-hosted) environment explicitly with the command:

rtsql> \open local
        

The shell can connect to at most one environment at a time. A \open command automatically disconnects from any previously-connected environment. You can explicitly disconnect from the environment with the command:

rtsql> \disconnect
        

You can close the shell with the command:

rtsql> \q
        

This is a synomym for:

rtsql> exit;
        

You can shut down the execution environment (which stops all running flows) with the command:

rtsql> \shutdown!
        

8.3. Monitoring flows

Each SELECT statement is instantiated as a flow in the execution environment. Flows are persistent: they continue to read data from the associated Flume sources indefinitely, even if the client disconnects.

You can get a list of all running flows with the command:

rtsql> \f
        

This returns the following fields of information:

Table 6. Columns in the running flows list
ColumnDescription
Watch?This column has a * in it if you are watching the output of this flow on your console.
FlowIdThe numeric id associated with the flow. Commands that control a flow will use this id.
Stream If the output of this flow is used as a stream (e.g., CREATE STREAM foo AS SELECT...), the name of the stream (foo) is shown here.
QueryThe actual rtsql query which was used to create this flow.

By default, when you submit a flow (that is, run a SELECT statement), you are watching the output. Any events which are generated by this flow will be printed to your console. You can unwatch a flow with the \u or \unwatch commands.

The following command unwatches a flow with FlowId 3:

rtsql> \u 3          
        

(You can find the flowId for a flow with the \f command first.)

You can then resubscribe to a flow with the \w or \watch commands. This will resubscribe to the same flow’s output:

rtsql> \w 3
        

You can configure your session to not automatically watch flows as you create them; you will then need to explicitly watch any flows you create if you want to inspect their output. This property is controlled by the flumebase.flow.autowatch key in the session configuration (see Section 8.5, “Controlling the session configuration”).

8.4. Controlling flows

You can cancel a flow entirely with with the \d and \D commands. Each of these takes a FlowId, and destroys the associated flow. The \d command does not block; \D will wait until the flow is complete before returning:

rtsql> \d 3
        

If a flow was created via CREATE STREAM AS SELECT, the stream name associated with the flow can be removed, without stopping the flow itself, via the \dname command:

rtsql> CREATE STREAM foo AS SELECT x FROM bar;
Started flow: flow[mId=5]
rtsql> \dname 5
Removed stream name from flow 5
        

A new stream name can then be attached to the same flow:

rtsql> \name 5 baz
Created stream 'baz' on flow 5
        

A stream name can be attached to any running flow with the \name command. This creates a new Flume logical node with the same name as the stream; its source is populated with events containing avro representations of the fields emitted by the flow. A flow can have at most one stream name attached at a time.

8.5. Controlling the session configuration

Each client session has a configuration associated with it; this is initially populated from the configuration files. The configuration is a set of key=val pairs, where both the keys and values are strings (although some keys are expected to have values which behave as integers, etc.). The configuration can be viewed with the command: \set

rtsql> \set
io.seqfile.compress.blocksize = '1000000'
io.skip.checksum.errors = 'false'
fs.checkpoint.size = '67108864'
...
        

Configuration keys are roughly hierarchical. You can view the configuration keys under any prefix that ends with a "." by typing \set prefix.:

rtsql> \set flumebase.
flumebase.flume.master.port = '35873'
flumebase.flume.master.host = 'localhost'
flumebase.autoconnect = 'local'
        

You can also view any specific key:

rtsql> \set flumebase.autoconnect
flumebase.autoconnect = 'local'
        

This command may also be used to set the value of any key:

rtsql> \set flumebase.flume.master.port=12345
flumebase.flume.master.port = '12345'
        

Setting configuration values does not modify the behavior of any previously-submitted flows. The behavior of a new flow may be controlled by setting configuration keys, and then submitting the query that generates the flow.

8.6. Miscellaneous commands

At any time when inputting a command, the current input may be canceled by appending \c to the current line, and pressing enter:

rtsql> SELECT something where I made a typo
    -> and then continued \c
rtsql>
        

A help message which lists all available control commands can be accessed by typing help; or \h:

rtsql> \h
All text commands must end with a ';' character.
Session control commands must be on a line by themselves.

Session control commands:
  \c                    Cancel the current input statement.
  \d flowId             Drop the specified flow.
  \D flowId             Drop a flow and wait for it to stop.
...
        

9. Function Reference

This section describes all available functions to apply to values and events. A list of functions and their type signatures can also be accessed within the FlumeBase shell by typing SHOW FUNCTIONS;.

9.1. Event property accessor functions

Table 7. Event property accessor functions
functionaccessestype
EVENT_TIMESTAMP()Event timestamp and nanos propertiesTIMESTAMP NOT NULL
HOST()Event origin hostSTRING NOT NULL
PRIORITY()Event priority label STRING NOT NULL
PRIORITY_LEVEL()Event priority as an integer INT NOT NULL

9.2. Aggregate functions

The functions in this section all operate over a window of data and return aggregate values.

Table 8. Aggregate functions in rtsql
Function nameDescription
COUNT(*) Counts the number of events which match the group and time interval
COUNT(expr) Counts the number of events where expr is non-null
SUM(expr) Returns the sum of the values in expr
MAX(expr) Returns the maximum value for expr
MIN(expr) Returns the minimum value for expr
AVG(expr) Returns the arithmetic mean value for expr

rtsql does not support the COUNT(DISTINCT col) syntax.

9.3. Functions that operate on lists

The following functions construct and manipulate data of type LIST<t>.

Table 9. List functions in rtsql
Function nameDescription
CONCAT(LIST<'a>...) Concatenates a set of lists into a single list where all items have the same type.
CONTAINS(LIST<'a> lst, 'a val) Returns true if lst contains val.
INDEX(LIST<'a> lst, INT idx) Returns the idx'th value in lst.
SIZE(LIST<'a> lst) Returns the number of items in lst.
TO_LIST(<'a> item...) Returns a LIST<'a> containing the items specified as arguments.

9.4. Functions to work with binary data

Table 10. Binary functions in rtsql
Function nameDescription
BIN2STR(BINARY b) Returns a STRING containing the string representation of b. Assumes b is UTF-8 encoded.
STR2BIN(STRING s) Returns a BINARY representation of the UTF-8 encoding of s.

9.5. Functions that operate on strings

Table 11. String functions in rtsql
Function nameDescription
LENGTH(STRING s) Returns the number of characters in s.
STR2BIN(STRING s) Returns a BINARY representation of the UTF-8 encoding of s.

9.6. Functions that operate on timestamps

Table 12. Timestamp functions in rtsql
Function nameDescription
CURRENT_TIMESTAMP() Returns the current timestamp on the rtsql server.
EVENT_TIMESTAMP() Returns the timestamp associated with the current event being processed.

9.7. Functions that operate on numbers

Table 13. Numeric functions in rtsql
Function nameDescription
SQUARE(expr) Returns the square of the numeric value that expr evaluates to.