Table of Contents
FlumeBase is a database-inspired stream processing system built on top of Flume. This system allows users to dynamically insert queries into a data collection environment and inspect the stream of events being collected by Flume. These queries may spot-check incoming data, or specify persistent monitoring, data transformation, or quality filtering tasks. Queries are written in a SQL-like language called "rtsql."
FlumeBase can present data back to users of an interactive shell environment. It can also be configured to deliver streams of output events back into a Flume network, for consumption by other tools or persistance in HBase, HDFS, or other storage media.
The emphasis of this system is on low-latency analysis of incoming data being captured by Flume. The name "rtsql" (FlumeBase's query language) underscores the real-time nature of the query system, as well as the SQL-based origin of the query language syntax. It is hoped that FlumeBase will allow you to perform useful in-line data transformation or filtering, or time-sensitive alerting or tuning of a broader system, before subjecting the data being captured by Flume to a deeper (but perhaps higher-latency) analysis with other tools such as Hadoop MapReduce.
FlumeBase is an EXPERIMENTAL system! This is in no way ready for production use. Use this AT YOUR OWN RISK. Connecting this system to production Flume nodes may result in data loss, misconfiguration, or other serious problems.
This document explains how to install and configure the FlumeBase system. It then explains the rtsql language, used to submit queries to the runtime environment, and the commands used to control the terminal client itself. This document is intended for:
For those who understand Flume, SQL, and want to just see a demo of what can be done with FlumeBase, follow the steps in this section. This is a five minute tour of the FlumeBase world.
First, copy the following text into a file named data.txt.
1,aaron,purple,42
2,bob,blue,11
3,cindy,green,312
Install Flume 0.9.3, Hadoop 0.20, and Java 6. If you are running Cloudera's Distribution of Hadoop 3 beta 4 (CDH3B4), you have already installed all of these. Users of older versions of these products will need to upgrade. See Section 3, “Installation” for more thorough installation instructions.
Unzip the FlumeBase installation:
$ tar vzxf flumebase-(version).tar.gz
Start the FlumeBase shell:
$cd flumebase-(version)/$bin/flumebase shell
By default, FlumeBase is configured with a self-contained environment that embeds the FlumeBase server and Flume itself within the same process as the shell. Now let's define a stream over the file, and query it.
rtsql>CREATE STREAM data(id int, name string, favcolor string,->luckynumber int) FROM LOCAL SOURCE 'tail("/path/to/data.txt")';CREATE STREAM rtsql>SELECT * FROM data;
You created stream which operates over a local (self-hosted) Flume logical node
which reads all the lines from data.txt. You then ran
a query that extracts all fields from each event in the stream. Each line
of the file corresponds to a different event.
In another terminal, now execute the following:
$ echo 4,dave,orange,611 >> /path/to/data.txt
You should observe that as soon as Flume detects the new record (about a second's delay), it will be passed along to FlumeBase and emitted on your console.
The submitted query has created a "flow," which runs as long as we allow it. If more data were to enter Flume via that file, we would continue to process it. Now, let's cancel that flow:
rtsql> \d 1
(As FlumeBase decommissions the internal logical node, there may be an error emitted by Flume itself; this is normal. In general, running in a single process will be "noisy" because of both client and server activity condensed to a single console. For a cleaner session experience, run the server and client in separate processes; see Section 3, “Installation” for instructions.)
And now let's run another query:
rtsql> SELECT favcolor FROM data WHERE luckynumber = 42;
After a few seconds, this flow is initialized with the data in the Flume
logical node. Note that we only get one row out from our original data set.
If you add more lines to the file which add events where the luckynumber
column is 42, you'll see them appear in the FlumeBase
console.
This concludes our tour. To quit the FlumeBase shell, run:
rtsql> \q
The remaining sections of this user guide will describe multi-process configuration, the rtsql language, and shell operation in greater detail. Good luck!
FlumeBase requires a few prerequisites before it can be run on your machine:
Java can be obtained from http://www.oracle.com/technetwork/java/index.html. The Java 6.0 SE JRE (or JDK) is required. Java downloads and installation instructions can be found on Oracle's web site.
Both other prerequisites can be installed from Cloudera's Distribution for Hadoop, version 3-beta-4 (CDH3b4) or newer. See http://archive.cloudera.com for instructions on downloading and installing Cloudera's Distribution for Hadoop.
While FlumeBase is written in Java and thus should be portable across a wide variety of operating systems, testing has only been performed under a Linux environment. It is likely to work under cygwin and OS X as well, but no guarantees are made.
The following prerequisite knowledge is required to understand this documentation:
bash
By default, FlumeBase is configured to run in a single process combining both the interactive shell, and the execution engine. Terminating the shell will also terminate the execution environment, including all running queries. This is most useful for evaluating FlumeBase. For more serious use, the execution environment should be run in a persistent process on a server. Clients should be configured to connect to this server, or users should be instructed to explicitly do so.
To enable zero-configuration evaluation of FlumeBase, the FlumeBase process will also host an embedded Flume master node. To interact with existing streaming data sources, this should also be reconfigured to point to an existing Flume deployment.
Install FlumeBase on a server where the query execution engine should
be run. Then edit the etc/flumebase-site.xml file
to contain the following values:
| Property | Value |
flume.home |
The path to $FLUME_HOME on your server. |
flumebase.remote.port |
The port where the FlumeBase server listens for clients. |
embedded.flume.master |
This should be set to false if a Flume
master is available. A value of true means
that the FlumeBase environment acts as its own Flume master, separate
from an existing Flume network. |
flumebase.flume.master.host |
The hostname of the foreign Flume master to connect to. |
flumebase.flume.master.port |
The port the foreign Flume master listens on. |
flumebase.flume.collector.port.min/max |
FlumeBase uses Flume collectors to receive data from the broader
Flume network. set ...port.min and
...port.max to the range of ports on the
FlumeBase server which the FlumeBase daemon may use for this purpose. |
A set of environment variables also govern how FlumeBase locates
various directories, and how FlumeBase instantiates its JVM. Default
values for these can be set in a file named etc/flumebase-env.sh.
A template version of this file is available for your convenience.
The FlumeBase server will use a pidfile to ensure that multiple instances
of the server do not contend for open ports, etc. The pidfile will be placed
in the directory identified by $FLUMEBASE_PID_DIR.
A FlumeBase server running in the background will write to a log file
in the directory identified by $FLUMEBASE_LOG_DIR.
Finally, to run in distributed mode, the Flume master node needs to
register the FlumeBase plugin. You should copy the
flumebase-(version).jar file from the FlumeBase
installation into /usr/lib/flume/lib on the Flume
master machine. Then edit flume-site.xml on
the master to include the setting:
<property>
<name>flume.plugin.classes</name>
<value>com.odiago.flumebase.flume.FlumePlugin</value>
</property>
You may need to restart the Flume master process for this to take effect.
To learn how to start the server, see Section 7, “The FlumeBase server”.
Install a copy of FlumeBase on every client machine where users intend to submit queries to the FlumeBase system. The client must be able to open a TCP connection to the FlumeBase server. In order to view output events on the FlumeBase console, the server must be able to open a TCP connection back to the client.
Set the following settings in etc/flumebase-site.xml
on the client machine:
| Property | Value |
flume.home |
The path to $FLUME_HOME on the client. |
flumebase.autoconnect |
The host:port of the FlumeBase server to connect to. If set
to local, this will use an in-process server.
If set to none, the user must explicitly open
a server connection with \open in the
console. |
flumebase.flow.autowatch |
Defaults to true; this boolean property
specifies whether you want every query to automatically send its
output to the console when submitted. If false, you must explicitly
watch flow output with the \watch command.
|
flumebase.console.port |
FlumeBase uses a Thrift RPC connection to relay query output back to the client. The client listens on the port specified by this property. |
The FlumeBase system is composed of a command-line client, a server called the "execution environment," and the Flume system that collects and transports data. These may be configured as separate, distributed processes, or collocated on a single machine, or in a single process.
The command-line client is the simplest component in the product. This process is run directly by a user (perhaps on a server, but more often her own desktop or laptop). This connects to the execution environment. The client provides the user with a prompt, where new queries or control statements may be entered.
Each query (i.e., SELECT statement) produces a
flow in the execution environment. The user may
subscribe to running flows (this is done automatically for new flows
created by the user). When a subscribed flow emits an output event, its
text is printed to the client terminal.
Closing the client does not terminate any submitted flows. These are
running in the execution environment, a separate
long-lived process which may be shared by multiple users. An execution
environment holds the definitions of all streams (created by
CREATE STREAM statements), and processes the running flows. The
execution environment is typically run on a dedicated server. For
evaluation purposes, it may also be hosted inside the same process as a
command-line client. (When the execution environment is embedded
in the client, terminating the client will terminate all running
flows, and discard knowledge of any streams.)
Submitted queries (flows) allow computation over streams of data. Streams are defined as a set of events, which are roughly analogous to "records" in a table-based SQL environment. These events are directly linked to "events" in Flume. Users define streams before querying them; these definitions specify the fields within each event, how to parse the event body into the fields, and where the stream originates. Each flow is itself a stream; its output is also a series of events, based on the computations specified by the user and the set of input events the flow operates over.
By default, queries submitted by users result in anonymous flows, which deliver their outputs only to the subscribed client instances. These flows continue to operate while no users are subscribed, but output events generated when no users are subscribed are simply dropped (there is no way to retrieve them later).
Users can bind a name to a running flow (or do so when submitting a flow
with the CREATE STREAM AS SELECT syntax). This name is
used as the name for a Flume logical node, which broadcasts the output of the
flow as a set of Avro-encoded events. Users may then use the Flume shell
to configure this logical node to direct a copy of its output to a
monitoring application, persistent storage (such as HDFS), or elsewhere.
Section 6.1.1.5, “CREATE STREAM AS SELECT” describes the CREATE STREAM
AS SELECT syntax and its effects in greater detail. Section 8.4, “Controlling flows” describes how to manipulate flow names.
FlumeBase reads from a Flume network by modifying the sink
definitions of nodes specified with CREATE
STREAM statements. When a logical node is identified
as a stream source, its sink definition is rewritten as a
fan-out sink containing its original sink, and a new agent sink
which forwards the node's output to a collector source hosted
within the FlumeBase execution environment. (The FlumeBase execution
environment will host an embedded Flume physical node, which
then hosts logical nodes as necessary to receive and transmit
streams of events.) When a stream is gracefully dropped (by
using DROP STREAM to drop the stream, or
\shutdown! to shutdown the execution
environment), the original logical node definition is restored
to the logical node which provided the data stream.
Interaction between a FlumeBase execution environment and Flume is performed via the Flume master node's thrift interface. The physical node hosted within an execution environment is controlled by the Flume master node, and is for all intents and purposes, an ordinary Flume node. For this reason, flows may take a few seconds to initialize (or cancel), as they are dependent on Flume for aspects of their configuration. Once initialized, flows should operate on events with low latency. If no external Flume network is available, you can configure the Flume execution environment to host an embedded Flume master node, for evaluation or single-machine computation purposes.
Users interact with FlumeBase by submitting commands and queries written in a language called rtsql. The rtsql language is designed to allow on-going analysis of incoming data. The language is similar to SQL:2003; its syntax will be largely familiar to SQL experts. It also provides SQL:2003-style windowed operators which allow joining and aggregation over bounded amounts of time.
In rtsql, all data is consumed through streams. The
FlumeBase architecture assumes that these streams cannot be replayed, and may
be of infinite length. Therefore, all operators such as GROUP
BY which can use “all the rows” as input are restricted so that
they can only use windowed views into the stream. rtsql does allow a
stream to be defined over a file. A SELECT statement
querying such a stream will read the data in-order in the file and then
terminate when it reaches the end of the file, but rtsql does not
currently have special provisions for working with these data sources
in a different fashion than Flume-based sources.
Keywords in rtsql are case-insensitive. Identifiers (stream, column,
function names, etc) are translated to lower-case for their canonical
representation, unless they are "double-quoted" in
which case they are interpreted literally.
The CREATE STREAM statement will create a stream
definition which may be used in subsequent statements such as
SELECT.
CREATE STREAMstream_name(col_namedata_type [, ...]) FROM [LOCAL] {FILE | NODE | SOURCE}input_spec[EVENT FORMAT format_spec [PROPERTIES (key = val, …)]] CREATE STREAMstream_nameAS select_statement data_type ::= BOOLEAN | BINARY | BIGINT | INT | FLOAT | DOUBLE | PRECISE(int) | STRING | TIMESTAMP format_spec ::= 'delimited' | 'regex' | 'avro'
Section 6.3, “Data types and value ranges” describes the rtsql data types in greater detail.
input_spec is a
'single-quoted-string' identifying the filename /
Flume logical node / Flume source specification to use as the input
for this stream.
File names are Hadoop Path objects; they may
specify the complete URI to a file, using any protocol permitted by
the Hadoop common library. e.g.:
rtsql>CREATE STREAM foo (x STRING) FROM FILE->'hdfs://nn.example.com/user/aaron/foo.txt';
Unqualified file names are interpreted relative to the value of the
fs.default.name configuration parameter. For
example, if this were set to
'hdfs://nn.example.com', the following
definition would be equivalent to the previous one:
rtsql> CREATE STREAM foo (x STRING) FROM FILE '/user/aaron/foo.txt';
Using the LOCAL keyword will cause the source
definition to be interpreted relative to the local filesystem of the
FlumeBase server. The following two statements are equivalent:
rtsql>CREATE STREAM foo (x STRING) FROM LOCAL FILE '/home/aaron/foo.txt';rtsql>CREATE STREAM foo (x STRING) FROM FILE 'file:///home/aaron/foo.txt';
Note that if the FlumeBase server is on a different machine than the
client, this will read from /home/aaron/foo.txt
on the FlumeBase server -- not the client.
The EVENT FORMAT clause specifies how the bytes
inside an event should be interpreted. By default, rtsql uses the
delimited event format. Events are assumed to
contain UTF-8 text representations of each field, separated by commas.
By specifying an EVENT FORMAT, you can choose which
parser to apply to each event. The event format is specified as a
'quoted string'. The next few subsections define
the available event formats.
You can further control the behavior of the event parser
by specifying (key, value) pairs in the PROPERTIES
section. The keys recognized are specific to each event format. Keys
and values are both single-quoted strings.
When reading a stream from a file, there is no Flume timestamp to
associate with each event. By default, FlumeBase will associate the
current system timestamp as it reads each line of the file with the
event generated for that line. This can be overridden by specifying
the timestamp.col property in the
PROPERTIES section of the CREATE
STREAM statement. The timestamp.col
must refer to a column of type TIMESTAMP. If the
timestamp value for an event is null, the current system timestamp
will be used instead.
The (default) delimited event format
allows FlumeBase to interpret events consisting of UTF-8
encoded text. Individual fields are expected to be
separated by commas. All values are expected to be
converted to text. BINARY columns are created
as the bytes holding a UTF-8 encoded string (which was
terminated by the field delimiter).
The delimiter character is controlled by the
delimiter property. You may set this to any
other character; for example, a pipe character:
rtsql>CREATE STREAM x(a int, b int) FROM LOCAL FILE 'foo.txt'->EVENT FORMAT 'delimited' PROPERTIES ('delimiter' = '|');
Nullable integer, timestamp, etc. fields are regarded as null if the
field is an empty string (i.e., two delimiters occur in a row). A
column of type STRING of zero length will be an empty
string. NULL string values are, by default, indicated by the
sequence \N. This sequence can be overridden by
any other string with the null.sequence
property.
Columns which are lists of other values (that is, columns
with type LIST<t> where
t is some other type, such as
INT) contain an additional list
delimiter, which separates the values within
each list. The list delimiter is controlled by the
list.delim property. The default
value for list.delim is the pipe
("|") character.
FlumeBase can interpret events which contain a single serialized avro
record as a collection of fields. The event is assumed to be in the
Avro binary encoding format. You must specify the
schema property to describe the expected
encoding schema. (This is in addition to the normal column
definition section of the CREATE STREAM
statement.) The schema is expected to be a single Avro record (with
any name) which contains a set of fields; these fields must have the
correct avro types ("string",
"long", etc.) to match the expected rtsql types
(STRING NOT NULL, BIGINT NOT NULL, etc.).
A nullable type (e.g., STRING) is expressed as
an avro union of ["string", "null"].
Another text-based event format, this format allows you to specify
a regular expression, the groups of which are extracted as the columns.
Each event is a single line of UTF-8 encoded text. The
regex property is required. This should define
as many binding groups (with (parentheses)) as
columns are specified in the stream definition. The
null.sequence and list.delim
properties apply to this format as well.
One of the most powerful uses of rtsql is as an inline processor of
Flume events. The output of a FlumeBase flow can be used as a Flume
source for further downstream processing or data collection. Named
streams, defined by CREATE STREAM AS SELECT will
cause the FlumeBase execution environment to host a Flume logical node
with the same name as the stream name. This logical node will
deliver to its sink all output events of the flow. The events will
be in binary-encoded Avro format; a record with the same name as the
stream, with field names equal to the display names of each select
expression.
By default, the null sink is used for the logical
node created by this syntax. You should use the Flume shell to
reconfigure the logical node to deliver this output to other
required sinks.
The DROP STREAM statement removes a stream
definition created by CREATE STREAM.
DROP STREAM stream_name
When dropping a stream created in terms of a flow (CREATE
STREAM AS SELECT), this will decommission the Flume logical
node and drop the stream identifier, but will not cancel the flow
itself. See Section 8.4, “Controlling flows” for more information
on how to cancel the flow itself.
The SHOW FUNCTIONS statement shows the definitions
of all functions which may be applied to expressions in a statement.
The output of this command is a list of functions and their types.
Types are written in the form ((input_types) ->
output_type).
rtsql> SHOW FUNCTIONS;
length ((STRING) -> INT)
...
The length function may take a
STRING or NULL value, and returns an
INT (or NULL, if the input was
NULL).
Some functions are polymorphic -- their input types are flexible,
subject to certain constraints, and their output types may or may not
match their input types. For example, the sum
function can operate over any numeric type:
rtsql> SHOW FUNCTIONS;
sum ((var('a, constraints={TYPECLASS_NUMERIC})) -> var('a, constraints={TYPECLASS_NUMERIC}))
...
The input argument’s type is var('a, constraints={
TYPECLASS_NUMERIC}). This is a type variable with the name
'a (pronounced "alpha"), and can take any type subject to the
constraint that it is in the typeclass "numeric" -- that is, it is one
of INT, BIGINT, FLOAT, or
DOUBLE. It is an error to take the sum of a
STRING or BOOLEAN column.
The output argument is the same type variable "alpha;" whatever type is used for the input, will also be used as the output type. For more information on polymorphic types, see Section 6.3.10, “Polymorphic types and type classes”.
A complete list of functions included with rtsql is provided in Section 9, “Function Reference”.
The DESCRIBE statement shows the definition of a
single object in rtsql:
DESCRIBE identifier
This may be used to inspect a single stream, function, or other entity present in the symbol table.
The following statement displays the argument and return types for the
length function:
rtsql> DESCRIBE length;
length ((STRING) -> INT)
The EXPLAIN statement shows the execution plan
for an rtsql statement:
EXPLAIN statement
This may be used to inspect the operation of any rtsql statement. The output of the command is a text description of how the statement was parsed (in a tree-based representation), followed by a control-flow graph of the steps applied in the runtime environment to satisfy the query.
rtsql> EXPLAIN SELECT x FROM foo;
The SELECT statement returns an event stream
computed in terms of one or more existing event streams.
select_statement ::= SELECT select_expr, select_expr ... FROM stream_reference
[ JOIN stream_reference ON join_expr OVER range_expr, JOIN ... ]
[ WHERE where_condition ]
[ GROUP BY column_list ]
[ OVER range_expr ]
[ HAVING having_condition ]
[ WINDOW window_name AS ( range_expr ), WINDOW ... ]
A simple SELECT statement can return all events in
a stream:
rtsql> SELECT * FROM foo;
It can also return only a specific subset of fields from the underlying stream:
rtsql> SELECT a, b, d FROM foo;
In addition to referencing specific fields, mathematical expressions may be calculated as well:
rtsql> SELECT 2 * a + 3 FROM foo;
The following table lists all available operators. Operators at one level of the table have higher priority than operators in a lower row of the table. Operators of the same priority are applied left-to-right. Parentheses can be used to override precedence. (This is the same precedence order as uesd by Java, for the subset of Java operators supported by rtsql.)
| Operator class | operators |
| unary null operators: | IS NULL, IS NOT NULL |
| unary operators: | + - NOT |
| multiplicative: | * / % |
| additive: | + - |
| comparison: | > < >= <= |
| equality: | = != |
| logical conjunction: | AND |
| logical disjunction: | OR |
| function call: | f(e1, e2, e3...) |
| identifiers and constants: | x 42 'hello!' |
Each selected expression may have an alias associated with it:
rtsql> SELECT 2 * a AS doubled FROM foo;
The AS keyword itself is optional.
This is specifically useful in the context of nested
SELECT statements:
rtsql>SELECT doubled FROM (SELECT 2 * a AS doubled FROM foo)->AS q WHERE doubled > 4;
rtsql does not support the DISTINCT or
ALL keywords; every query is implicitly
"SELECT ALL."
stream_reference ::= (stream_name| select_statement) [[AS]ref_name]
The stream_reference in a SELECT
statement may literally identify a stream:
rtsql>CREATE STREAM foo (x string) FROM ...;CREATE STREAM rtsql>SELECT * FROM foo;...
You may also qualify column names with their stream name:
rtsql> SELECT foo.x FROM foo;
And you may provide a reference name (ref_name)
that is different than the stream name:
rtsql> SELECT v.x FROM verylongname AS v;
The AS keyword is optional. This is equivalent to:
rtsql> SELECT v.x FROM verylongname v;
A stream_reference may also be a nested
SELECT statement.
rtsql> SELECT LENGTH(x) FROM (SELECT x FROM foo) AS f;
Each nested SELECT statement must be given a
ref_name alias (f in the
previous example). You do not need to qualify individual column names
with the ref_name unless the column name would
otherwise be ambiguous (e.g., if two sources are joined, and they each
contain a column named x, then all references
to x must be qualified with the source
ref_name).
where_clause ::= WHERE bool_expr
A SELECT statement may filter some input events,
and emit output events corresponding only to input events that match a
boolean predicate.
rtsql> SELECT x FROM foo WHERE LENGTH(x) > 5;
This may be a compound boolean expression (using the
AND and OR operators). rtsql
does not support the IN or
EXISTS operators. Subqueries are also not
permitted in a WHERE clause.
join_clause ::= JOIN stream_reference ON join_expr OVER range_expr
A SELECT statement may correlate events from
multiple sources and operate on their joined representation. In
table-based SQL systems, any row of one table may be joined with any
row of another table in a JOIN clause. Since FlumeBase
operates over potentially infinite streams of data, this model would
not scale. Instead, JOIN clauses require a window
clause which defines the time-based boundaries within which a join may
occur.
The only join expression supported is an equi-join; the
join_expr must use the equality operator
(=) to relate one field of each of the two joined
streams.
The range_expr specifies the time range for the
dependent (right) stream in which events may be joined to a given
event of the primary (left) stream.
range_expr ::= RANGE INTERVAL expr time_scale PRECEDING
| BETWEEN INTERVAL expr time_scale PRECEDING AND INTERVAL expr time_scale FOLLOWING
time_scale ::= SECONDS | MINUTES | HOURS | DAYS | WEEKS | MONTHS | YEARS
Consider the following example:
rtsql>SELECT * FROM f JOIN g ON f.x = g.y->OVER RANGE INTERVAL 5 SECONDS PRECEDING;
This specifies that for each event seen in f
(the primary stream), it may be joined with any events in
g (the dependent stream) which occurred up to
five seconds before the event in f.
The opposite time relation holds from the perspective of events in
g: for each event in stream
g, it may be joined with any events in
f which occur up to five seconds later.
rtsql>SELECT * FROM f JOIN g ON f.x = g.y OVER->RANGE BETWEEN INTERVAL 1 SECONDS PRECEDING->AND INTERVAL 5 SECONDS FOLLOWING;
This example specifies that each event in f may
be joined with any matching events in g which
occured up to one second before, or five seconds after the event in
f.
Only inner joins are supported at present. The INNER,
OUTER, NATURAL, LEFT,
RIGHT, and FULL keywords are not (yet)
supported by rtsql.
Aggregate operators may be used in rtsql in a similar manner to ordinary SQL systems.
group_by_clause ::= GROUP BYcol[,col...] over_clause ::= OVER range_expr having_clause ::= [ HAVING bool_expr ]
If an aggregate function (e.g., sum,
count) is used in a SELECT
statement, the statement must have an over_clause
which defines the time window in which aggregate operators work.
Aggregation over "the entire stream" is not supported.
The range_expr syntax is given in
Section 6.2.3, “JOIN clauses”.
The following example provides a count of the number of events observed over a rolling five second window:
rtsql> SELECT COUNT(*) FROM foo OVER RANGE INTERVAL 5 SECONDS PRECEDING;
This may be further refined with a group_by_clause. For example:
rtsql>SELECT COUNT(*) FROM foo GROUP BY event_src->OVER RANGE INTERVAL 5 SECONDS PRECEDING;
FlumeBase uses bucketing to support rolling time windows. By default, 100 buckets are used. So the previous two examples will support rolling counts with a "step" size of 50 milliseconds (5 seconds / 100 buckets).
You can specify the number of buckets by setting the
flumebase.aggregation.buckets key in the session
configuration (See Section 8.5, “Controlling the session configuration”).
A larger number of buckets allows finer granularity in rolling
windows, but may increase memory usage.
By default, windowed aggregation operators will emit output groups
only when the corresponding input buckets contained data. For
example, if data arrives at t=100 and t=150, and buckets are 10 ms
wide, an output group will be emitted with timestamp t=100 and
another one for t=150, but no intermediate counts or other
aggregates will be generated for t=110, t=120, etc.
This behavior can be configured by setting
flumebase.aggregation.continuous.output in the
session configuration. Setting this flag to true
will cause the aggregation operator to emit output groups for all
time intervals in which any data is available. This may prove more
useful for building time-series graphs, etc, but less so for working
with sporadic incoming data.
Note that by setting flumebase.aggregation.buckets
to 1 and
flumebase.aggregation.continuous.output to
true, you can disable rolling windows, and instead
divide the stream into discrete time-based groups.
This example gives a minute-by-minute summary of hits per minute from a web log:
rtsql>\set flumebase.aggregation.buckets=1;rtsql>\set flumebase.aggregation.continuous.output=true;rtsql>SELECT COUNT(*) as hits FROM httpd_log->OVER RANGE INTERVAL 1 MINUTES PRECEDING;
Flume may deliver events out of order; FlumeBase tolerates improperly
ordered events that arrive within the "slack interval" of 200
milliseconds of when they are expected. Events that arrive after the
slack interval may be excluded from aggregate functions.
The slack interval may be configured by setting
flumebase.slack.time
to a different integer number of milliseconds in the session configuration.
(See Section 8.5, “Controlling the session configuration”.)
The set of aggregate functions available in rtsql are described in Section 9.2, “Aggregate functions”.
rtsql does not support full SQL:2003
windowed operators; a single range for the entire select statement is
applied by the over_clause to all aggregate operators.
This may change in a future version of FlumeBase.
having_clause ::= [ HAVING bool_expr ]
A WHERE clause is evaluated before any grouping
operators are applied. By contrast, the HAVING
clause is applied after all grouping and projection operators (e.g.,
x AS y) have been applied. A
HAVING clause filters events at the end of
processing, emitting only those output events that satisfy the
boolean predicate.
For example, this statement filters the previous example such that only event sources that emit 10 or more records in a five second window are listed as output:
rtsql>SELECT event_src, COUNT(event_src) AS cnt FROM foo->GROUP BY event_src->OVER RANGE INTERVAL 5 SECONDS PRECEDING->HAVING cnt > 10;
window_clause ::= WINDOW window_name AS ( range_expr ), WINDOW ...
Windows may be defined with a window clause at the end of a
SELECT statement. A
window_name may be substituted for a
range_expr anywhere in the
SELECT statement:
rtsql>SELECT COUNT(*) FROM foo OVER mywin->WINDOW mywin AS (RANGE INTERVAL 10 SECONDS PRECEDING);
The columns of a stream event are extracted from the event body. In Flume, events have additional properties (the originating host, priority, and timestamp). Each event may also be decorated with additional arbitrary named attributes.
Named attributes of an event can be accessed with the syntax
"#attrname". This acts like a column of
type BINARY.
For example, to select events with the interesting attribute:
rtsql> SELECT * FROM foo WHERE #interesting IS NOT NULL;
Event attributes are defined as a STRING key and a BINARY value. To
use these values as strings, use the BIN2STR()
function:
rtsql> SELECT * FROM foo WHERE BIN2STR(#x) = 'abc';
A set of functions allowing you to access the host, priority, and timestamp properties of each event are described in Section 9.1, “Event property accessor functions”.
For example, to select events only at the ERROR priority level:
rtsql> SELECT * FROM foo WHERE PRIORITY() = 'ERROR';
The priority field is also available as an integer. More urgent priorities have
lower ordinal values ('FATAL' is 0).
To select events at the WARN level and more urgent:
rtsql> SELECT * FROM foo WHERE PRIORITY_LEVEL() <= 2;
Several data types are defined which can hold values of differing ranges. These column types do not follow ANSI SQL names; they are stored in underlying Java types and have the following ranges:
| rtsql type | Underlying Java type | Range |
| BOOLEAN | Boolean | true,
false |
| BINARY | ByteBuffer | Any array of bytes |
| BIGINT | Long |
[-263,
263-1] |
| INT | Integer |
[-231,
231-1] |
| FLOAT | Float | [2-149, (2-2-23)*2127] (positive or negative) |
| DOUBLE | Double | [2-1074, (2-2-52)*21023] (positive or negative) |
| PRECISE(n) | BigDecimal |
A BigDecimal with the scale property set to
n. |
| STRING | String | A UTF-8-encoded string |
| TIMESTAMP | (internal) | (See Section 6.3.7, “The TIMESTAMP type”) |
| LIST<t> | List | (See Section 6.3.8, “The LIST type”) |
Like all keywords in rtsql, type names are case-insensitive.
The BIGINT and INT types store integer values in 64- and 32-bit values, accordingly. The text representation of these values (e.g., in the delimited event format) is a base-10 integer.
The BOOLEAN type holds true/false values only. It
cannot be coerced to or from an integer type. The text
representation of these values are the UTF-8 strings
true and false.
The FLOAT and DOUBLE types hold
floating-point values. Operations on floating-point values
may be imprecise, subject to the constraints of the IEEE
floating-point format standard. All string values parsed by
java.lang.Float.valueOf() and
java.lang.Double.valueOf() may be
used as the text representation of these values.
The PRECISE() type constructor defines
a family of numeric types providing specific degrees of
precision. The argument to the type constructor specifies
the "scale" of the value; if positive, this is the number
of significant digits to the right of the decimal place.
If 0, all digits are significant. Operations on PRECISE
values preserve the scale of the value.
The STRING type will hold a UTF-8 encoded character string of arbitrary length. (Events in Flume have a maximum length, which defaults to 32 KB. The contents of all columns together in an event is subject to this limit.)
The empty string is a legal value in rtsql. In delimited events, a
NULL string is denoted by a configurable escape
sequence which defaults to the two characters \N.
Delimited events may not contain the delimiter character itself; the
ability to escape these characters is future work.
The BINARY type holds a byte buffer of arbitrary bytes. When reading from an Avro input source, any byte array can be specified. When reading from text-based inputs, the byte array will be the bytes representing the UTF-8 encoding of the string input.
If coerced to the STRING type (either implicitly, or
explicitly through the BIN2STR() function,
the UTF-8 character set will be applied to the bytes.
The STR2BIN() function will do the reverse,
returning a BINARY object that explicitly represents
the UTF-8 bytes of its input string argument.
Descriptions of functions that manipulate binary data are available in Section 9.4, “Functions to work with binary data”.
rtsql employs a notion of a TIMESTAMP distinct from other numerical values. A TIMESTAMP contains two fields; a milliseconds part, and a nanoseconds part. The milliseconds part holds the number of milliseconds since the UNIX epoch (00:00:00 UTC, Jan 1, 1970). The nanoseconds part holds the number of nanoseconds after the specified millisecond and should not exceed 1,000,000.
Despite this internal precision, the delimited event format parses
timestamps as a single 64-bit base-10 integer corresponding to the
milliseconds part, and the nanoseconds part is 0.
Furthermore, FlumeBase's internal notion of time and event-ordering works
only at the granularity of milliseconds. The Avro event format allows
the nanoseconds part to be specified. Even in this case, a column
specified by timestamp.col (see
Section 6.1.1.1, “Designated timestamp columns”) will associate only the
milliseconds part of a timestamp with the event itself.
The LIST type allows you to represent a list of values as the value for a single column. The values in a list may be interacted with as a group, or individual values may be extracted from the list.
Values in a list must all have the same type. This type is
specified with the syntax: LIST<t>,
where t is the specification of another
type. For example: LIST<INT>, or
LIST<STRING NOT NULL>. You may not
specify LIST without specifying the types
of the members of the list.
Any type may appear in the parameter to the
LIST type constructor. It is legal to specify,
for example, LIST<LIST<INT NOT
NULL>>. Note that LIST<INT> NOT
NULL is different from LIST<INT NOT
NULL> and LIST<INT NOT NULL> NOT
NULL -- though all three of these are legal.
Lists can be parsed from events with the
delimited event format; their elements are
separated by pipe characters ("|"). You can
override this by specifying the
list.delim property of the
event format.
Several functions exist to construct and manipulate lists; a reference is provided in Section 9.3, “Functions that operate on lists”.
Values of some types may be coerced to act as another type. This is performed automatically as necessary; no explicit type-casting operators are used (nor are they provided by the language). Coercion may only occur when moving to a "broader" type; this is called promotion in rtsql. The value itself may be modified to conform to the specific domain of the target type, but this is only performed if the modification would not lose data.
For example, an expression adding an INT value with a BIGINT value will provide a BIGINT result. The INT argument will be promoted to BIGINT and then added to the other BIGINT. Similarly, concatenating a STRING and an INT will coerce the integer into a string representation, and then concatenate it with the string.
Numeric values may be promoted to any broader numeric type. From narrowest to broadest, the numeric types are INT, BIGINT, FLOAT, and DOUBLE. PRECISE types may be promoted to any broader PRECISE type. INT and BIGINT promote to PRECISE(0), FLOAT promotes to PRECISE(24), and DOUBLE promotes to PRECISE(53).
All scalar types may be promoted to STRING. The result of coercing a value to STRING is the string representation of the value, as defined in the previous subsections.
Any type X NOT NULL may be promoted to its nullable counterpart.
A type LIST<X> may promote to LIST<Y> if X promotes to Y.
Some functions may operate over a variety of types. For example,
the sum function will add values of any
numeric type. Since DOUBLE is the widest numeric
type available, the type of the argument column for
sum could be marked as DOUBLE.
But that would promote every input to sum
to a DOUBLE value, and the output would also be
DOUBLE. This is not the case; using the
DESCRIBE statement, we can see the type of
the sum function is:
rtsql> DESCRIBE sum;
sum ((var('a, constraints={TYPECLASS_NUMERIC})) -> var('a, constraints={TYPECLASS_NUMERIC}))
In order to allow the sum function to accept
a variety of input types, its input type is specified as
a type variable with a universal type; this
is denoted by the var('a) type. The name of
this type variable is 'a and pronounced "alpha." Type
variables are bound to concrete types (e.g., INT) when
the rtsql statement is compiled. The same type variable specifies
the argument and return types of the sum; this way,
it will return the same type as it receives as input. (This can be
verified by observing that 'a is also the name of
the output type.)
The sum function cannot sum a set of strings,
however. Nor can it sum boolean values. Type variables may be
specified with a set of constraints; the type it is bound to must
conform to these constraints. The sum function can operate only
over numeric values. To allow this constraint specification,
rtsql provides a set of type
classes which are sets of types. Type classes are not
themselves concrete types; a column cannot be specified with
a type class. The set of type classes are listed in
Table 5, “Type classes and constraints in rtsql”.
| Type class | Concrete types included | Example use |
| TYPECLASS_NUMERIC | INT, BIGINT, FLOAT, DOUBLE | Input to and output of the sum function |
| TYPECLASS_COMPARABLE | All numeric types, STRING, and BOOLEAN | The max and min
functions |
| TYPECLASS_ANY | All concrete types | The count function |
| (any typeclass) NOT NULL | Each typeclass may be further specified as NOT NULL. |
Some functions allow a variable-length argument list. They may take
a number of required arguments, and may then have a list of arguments
of arbitrary length. The to_list() function, for
example, will construct a list out of all its arguments. The following
selects an empty list:
rtsql> select to_list() from x;
timestamp to_list()
1306906081936 []
Whereas the following will construct a list of 3 elements:
rtsql> select to_list(42,211,312) from x;
timestamp to_list(42, 211, 312)
1306906138072 [42, 211, 312]
Variable-length argument arrays are denoted by an ellipse (...)
in the type signature of the function. For example:
rtsql> describe to_list;
to_list ((var('a, constraints={TYPECLASS_ANY})...) -> LIST<var('a, constraints={TYPECLASS_ANY})> NOT NULL)
The FlumeBase server allows a server process to handle the execution of queries without requiring a long-lived shell session. FlumeBase shell instances can connect to local or remote server instances to submit queries. Before running a FlumeBase server, you should read Section 4.1, “Server configuration”.
The FlumeBase server can be started as a foreground process by
running bin/flumebase server from the directory
where FlumeBase was installed.
A FlumeBase server can also be run in the background, by
running bin/flumebase start. Log output
will be captured in a log file in $FLUMEBASE_LOG_DIR
(default: $FLUMEBASE_HOME/logs). If the
server is acting problematic, running bin/flumebase start -debug
will enable debug logging.
A running server may be stopped from within the FlumeBase shell. To learn how, see Section 8.2, “Connecting to the execution environment”.
To shutdown the server from the command line, run
bin/flumebase stop. This may take a few seconds,
as FlumeBase will send a set of instructions to the Flume master
to restore its flows to their original state, and "unplug" the running
queries. A hard kill (e.g., kill -9) of the FlumeBase
server is not recommended.
The FlumeBase shell allows users to interact with the FlumeBase environment. The default FlumeBase configuration connects to a single-threaded execution environment within the same process as the shell. You may also connect to a remote execution environment running in another process (on the same or a different machine).
The FlumeBase shell can be used to transmit rtsql statements defined in Section 6, “The rtsql language” to the execution environment. Several control commands are also defined which allow users to interact with the shell or the environment itself.
To start the FlumeBase shell, run bin/flumebase shell
from the directory where FlumeBase is installed.
As mentioned, FlumeBase's default configuration file causes an automatic connection to the local environment. You can connect to a remote environment with the command:
rtsql> \open server [port]
You can connect to the local (self-hosted) environment explicitly with the command:
rtsql> \open local
The shell can connect to at most one environment at a time. A
\open command automatically disconnects from any
previously-connected environment. You can explicitly disconnect from
the environment with the command:
rtsql> \disconnect
You can close the shell with the command:
rtsql> \q
This is a synomym for:
rtsql> exit;
You can shut down the execution environment (which stops all running flows) with the command:
rtsql> \shutdown!
Each SELECT statement is instantiated as a flow in
the execution environment. Flows are persistent: they continue to read
data from the associated Flume sources indefinitely, even if the
client disconnects.
You can get a list of all running flows with the command:
rtsql> \f
This returns the following fields of information:
| Column | Description |
| Watch? | This column has a * in it if you are
watching the output of this flow on your console. |
| FlowId | The numeric id associated with the flow. Commands that control a flow will use this id. |
| Stream | If the output of this flow is used as a
stream (e.g., CREATE STREAM foo AS SELECT...),
the name of the stream (foo) is shown
here. |
| Query | The actual rtsql query which was used to create this flow. |
By default, when you submit a flow (that is, run a SELECT statement),
you are watching the output. Any events which are generated by this
flow will be printed to your console. You can unwatch a flow with the
\u or \unwatch commands.
The following command unwatches a flow with FlowId 3:
rtsql> \u 3
(You can find the flowId for a flow with the \f command first.)
You can then resubscribe to a flow with the \w or
\watch commands. This will resubscribe to the same
flow’s output:
rtsql> \w 3
You can configure your session to not automatically watch flows as you
create them; you will then need to explicitly watch any flows you create
if you want to inspect their output. This property is controlled by
the flumebase.flow.autowatch key in the session
configuration (see Section 8.5, “Controlling the session configuration”).
You can cancel a flow entirely with with the \d and
\D commands. Each of these takes a FlowId, and
destroys the associated flow. The \d command does not
block; \D will wait until the flow is complete before
returning:
rtsql> \d 3
If a flow was created via CREATE STREAM AS SELECT, the stream name
associated with the flow can be removed, without stopping the flow
itself, via the \dname command:
rtsql>CREATE STREAM foo AS SELECT x FROM bar;Started flow: flow[mId=5] rtsql>\dname 5Removed stream name from flow 5
A new stream name can then be attached to the same flow:
rtsql> \name 5 baz
Created stream 'baz' on flow 5
A stream name can be attached to any running flow with the
\name command. This creates a new Flume logical node
with the same name as the stream; its source is populated with events
containing avro representations of the fields emitted by the flow.
A flow can have at most one stream name attached at a time.
Each client session has a configuration associated with it; this is
initially populated from the configuration files. The configuration is a
set of key=val pairs, where both the keys and values are strings
(although some keys are expected to have values which behave as
integers, etc.). The configuration can be viewed with the command:
\set
rtsql> \set
io.seqfile.compress.blocksize = '1000000'
io.skip.checksum.errors = 'false'
fs.checkpoint.size = '67108864'
...
Configuration keys are roughly hierarchical. You can view the
configuration keys under any prefix that ends with a "."
by typing \set :
prefix.
rtsql> \set flumebase.
flumebase.flume.master.port = '35873'
flumebase.flume.master.host = 'localhost'
flumebase.autoconnect = 'local'
You can also view any specific key:
rtsql> \set flumebase.autoconnect
flumebase.autoconnect = 'local'
This command may also be used to set the value of any key:
rtsql> \set flumebase.flume.master.port=12345
flumebase.flume.master.port = '12345'
Setting configuration values does not modify the behavior of any previously-submitted flows. The behavior of a new flow may be controlled by setting configuration keys, and then submitting the query that generates the flow.
At any time when inputting a command, the current input may be canceled
by appending \c to the current line, and pressing enter:
rtsql>SELECT something where I made a typo->and then continued \crtsql>
A help message which lists all available control commands can be
accessed by typing help; or
\h:
rtsql> \h
All text commands must end with a ';' character.
Session control commands must be on a line by themselves.
Session control commands:
\c Cancel the current input statement.
\d flowId Drop the specified flow.
\D flowId Drop a flow and wait for it to stop.
...
This section describes all available functions to apply to values and
events. A list of functions and their type signatures can also be
accessed within the FlumeBase shell by typing SHOW
FUNCTIONS;.
| function | accesses | type |
EVENT_TIMESTAMP() | Event
timestamp and nanos
properties | TIMESTAMP NOT NULL |
HOST() | Event origin host | STRING NOT NULL |
PRIORITY() | Event priority label | STRING NOT NULL |
PRIORITY_LEVEL() | Event priority as an integer | INT NOT NULL |
The functions in this section all operate over a window of data and return aggregate values.
| Function name | Description |
|
Counts the number of events which match the group and time interval |
|
Counts the number of events where expr is non-null |
|
Returns the sum of the values in expr |
|
Returns the maximum value for expr |
|
Returns the minimum value for expr |
|
Returns the arithmetic mean value for expr |
rtsql does not support the
syntax.
COUNT(DISTINCT
col)
The following functions construct and manipulate data of type
LIST<t>.
| Function name | Description |
|
Concatenates a set of lists into a single list where all items have the same type. |
|
Returns true if lst
contains val. |
|
Returns the idx'th value in lst.
|
|
Returns the number of items in lst.
|
|
Returns a LIST<'a> containing the
items specified as arguments.
|
| Function name | Description |
|
Returns a STRING containing the string
representation of
b. Assumes b is UTF-8
encoded. |
|
Returns a BINARY representation of the
UTF-8 encoding of s. |
| Function name | Description |
|
Returns the number of characters in s.
|
|
Returns a BINARY representation of the
UTF-8 encoding of s. |
| Function name | Description |
|
Returns the current timestamp on the rtsql server. |
|
Returns the timestamp associated with the current event being processed. |