HBlocks
A New Model For Big Data BI: Realtime and Easy-To-Use
A core element of Apache Hadoop is the MapReduce programming model. MapReduce enables programmers to develop massively parallel programs in languages such as Java and Python. The execution environment handles issues such as automatic fault tolerance and recovery.
The MapReduce model breaks the processing of large data sets into two phases: a Map phase and a Reduce phase. The two phases roughly correspond to the standard Map and Reduce primitives in functional programming, although the model is normally used with imperative programming languages such as Java, rather than with functional languages.
The MapReduce model and execution environment are designed to support offline batch analytics at scale, and are designed to be used by experienced programmers.
Today, big data analytics is increasingly concerned with processing live realtime data streams. These streams are often machine-generated, and the data arrives continuously at very high velocity. To handle such streams, and deliver maximum competitive advantage, we need a new big data analytics model that enables continuous, always-on realtime analytics, rather than offline batch analytics.
Another important point is that today many of those who need to analyze big data are business users rather than experienced programmers. Many of these business users are familiar with spreadsheets and other productivity tools, but have little or no experience of programming in languages such as Java or Python. So we also need a big data analytics model that provides a higher-level interface than that provided by the MapReduce model.
HBlocks Model
HBlocks is a new, simple, but very powerful big data analytics model that addresses both of these challenges. With HBlocks, ordinary business users can easily develop powerful, massively parallel big data business intelligence applications. The HBlocks model makes no distinction between live realtime data streams and historical data sets, so HBlocks supports both realtime analytics and batch analytics, and does so seamlessly.
Technically, HBlocks is a single-assignment dataflow model, based on a small set of data-parallel functional primitives: map, scan, previous, window, group, select and project. Implementations of the HBlocks model, such as Pinpoint, augment these core primitives with a rich library of built-in functions.
Map
The map primitive creates a new column by applying a row-wise operation to one or more existing columns. For example, suppose we have two columns price and volume, where
| price | 20.50 | 18.02 | 7.59 | 10.05 | ... |
| volume | 10 | 12 | 20 | 15 | ... |
and we define the data-parallel map operation
total = price * volume
then the new column will be
| total | 205.00 | 216.24 | 151.80 | 150.75 | ... |
Window
The time window primitive partitions the stream into disjoint contiguous substreams of a fixed time duration. In the HBlocks model, every row has a unique (timestamp,index) pair and the rows are uniquely linearly ordered by that pair of values. Timestamps increase (but do not necessarily strictly increase) throughout the stream. Index values are used to define the unique order of rows that share the same timestamp. Suppose you have the stream
| timestamp | 12:00 | 12:01 | 12:05 | 12:10 | 12:13 | 12:13 | 12:17 | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
and you partition it with five minute windows aligned on 12:00, then the stream starts with the four substreams shown
| timestamp | 12:00 | 12:01 | 12:05 | 12:10 | 12:13 | 12:13 | 12:17 | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
A more general form of the window primitive is the "punctuation window". Given a column of boolean values, each time the value is true, a new substream is started, and the previous one is closed. For example, given the columns
| punc | false | true | false | false | true | true | false | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
a punctuation window on the column punc would give
| punc | false | true | false | false | true | true | false | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
Punctuation windows allow streams to be partitioned not only by time, but by other parameters such as size, e.g. windows of 50,000 rows. Another punctuation window example is where a new window is started as soon as an important event is detected.
Time and punctuation windows allow window functions to be used in the HBlocks model. For example, if we have the following stream with the time window shown
| timestamp | 12:00 | 12:01 | 12:05 | 12:10 | 12:13 | 12:13 | 12:17 | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
then we can create a new column using the "window_sum" function
| timestamp | 12:00 | 12:01 | 12:05 | 12:10 | 12:13 | 12:13 | 12:17 | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
| sum | 22 | 22 | 20 | 37 | 37 | 37 | 21+ | ... |
Implementations of the HBlocks model, such as Pinpoint, contain many powerful window functions for reduction, ranking, sorting etc. Since windows are disjoint, the computation of window functions on all the windows in a stream can be performed in parallel.
Select
Given a column of boolean values, the select primitive discards all rows for which the value is false. For example, given the columns
| filter | false | true | false | false | true | true | false | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
a select on the column filter would give
| filter | true | true | true | ... |
| volume | 12 | 8 | 14 | ... |
Project
Given a set S of columns, the project primitive discards all columns other than those in S.
Previous
The previous primitive creates a new column by applying a row-shifting operation to the selected column. For example, suppose we have
| price | 20.50 | 18.02 | 7.59 | 10.05 | ... |
and we define the data-parallel previous operation
last_one = previous(price)
then the new column will be
| last_one | default | 20.50 | 18.02 | 7.59 | ... |
where default is the default value for that type of column. Note that in the HBlocks model, circular definitions are invalid, so defining an operation such as
price = previous(price) + 10.8
is not allowed. Multiple definitions containing a circularity are also prohibited, for example
last_one = previous(price)
price = last_one + 7
These kinds of iterative definitions can, however be specified using the scan primitive, see below.
Group
The group primitive partitions the stream into disjoint substreams based on the values in the "groupby" column. A separate substream is created for each unique value in the groupby column. For example, suppose we have
| price | 20.50 | 18.02 | 7.59 | 10.05 | 17.98 | 20.48 | 10.02 | 17.96 | 20.48 | ... |
| volume | 10 | 12 | 20 | 15 | 12 | 25 | 12 | 15 | 22 | ... |
| symbol | ASDE | DRVC | DFSW | KFCR | DRVC | ASDE | KFCR | DRVC | ASDE | ... |
and we group by symbol, then we have
| price | 20.50 | 20.48 | 20.48 | ... |
| volume | 10 | 25 | 22 | ... |
| symbol | ASDE | ASDE | ASDE | ... |
| price | 18.02 | 17.98 | 17.96 | ... |
| volume | 12 | 12 | 15 | ... |
| symbol | DRVC | DRVC | DRVC | ... |
| price | 7.59 | ... |
| volume | 20 | ... |
| symbol | DFSW | ... |
| price | 10.05 | 10.02 | ... |
| volume | 15 | 12 | ... |
| symbol | KFCR | KFCR | ... |
Since groups are disjoint, the computation of functions on all the groups in a stream can be performed in parallel.
Scan
The scan primitive creates a new column by applying an iterative operation to the selected column. For example, suppose we have
| price | 20.50 | 18.02 | 7.59 | 10.05 | ... |
and we define the data-parallel scan operation
prefix_sum = scan( + , price )
then the new column will be
| prefix_sum | 20.50 | 38.52 | 46.11 | 56.16 | ... |
Similarly, if we have the windowed stream
| timestamp | 12:00 | 12:01 | 12:05 | 12:10 | 12:13 | 12:13 | 12:17 | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
and we define the data-parallel scan operation
prefix_min = scan( min , volume )
then the new stream will be
| timestamp | 12:00 | 12:01 | 12:05 | 12:10 | 12:13 | 12:13 | 12:17 | ... |
| volume | 10 | 12 | 20 | 15 | 8 | 14 | 21 | ... |
| prefix_min | 10 | 10 | 20 | 15 | 8 | 8 | 21- |
Implementations of the HBlocks model, such as Pinpoint, have a number of these simple scan functions built-in (prefix_sum, prefix_min etc.) as well as the general mechanism for user-defined scan functions.
Reduce
As noted above, the HBlocks model provides several ways of performing a reduce operation, such as window_sum and prefix_sum on columns. There is therefore no need for a separate reduce primitive. Since windows and groups are disjoint, the computation of reductions on all the windows or groups in a stream can be performed in parallel.
