HBlocks

hblocksA New Model For Big Data BI: Realtime and Easy-To-Use

A core element of Apache Hadoop is the MapReduce programming model. MapReduce enables programmers to develop massively parallel programs in languages such as Java and Python. The execution environment handles issues such as automatic fault tolerance and recovery.

The MapReduce model breaks the processing of large data sets into two phases: a Map phase and a Reduce phase. The two phases roughly correspond to the standard Map and Reduce primitives in functional programming, although the model is normally used with imperative programming languages such as Java, rather than with functional languages.

The MapReduce model and execution environment are designed to support offline batch analytics at scale, and are designed to be used by experienced programmers.

Today, big data analytics is increasingly concerned with processing live realtime data streams. These streams are often machine-generated, and the data arrives continuously at very high velocity. To handle such streams, and deliver maximum competitive advantage, we need a new big data analytics model that enables continuous, always-on realtime analytics, rather than offline batch analytics.

Another important point is that today many of those who need to analyze big data are business users rather than experienced programmers. Many of these business users are familiar with spreadsheets and other productivity tools, but have little or no experience of programming in languages such as Java or Python. So we also need a big data analytics model that provides a higher-level interface than that provided by the MapReduce model.

HBlocks Model

HBlocks is a new, simple, but very powerful big data analytics model that addresses both of these challenges. With HBlocks, ordinary business users can easily develop powerful, massively parallel big data business intelligence applications. The HBlocks model makes no distinction between live realtime data streams and historical data sets, so HBlocks supports both realtime analytics and batch analytics, and does so seamlessly.

Technically, HBlocks is a single-assignment dataflow model, based on a small set of data-parallel functional primitives: map, scan, previous, window, group, select and project. Implementations of the HBlocks model, such as Pinpoint, augment these core primitives with a rich library of built-in functions.

Map

The map primitive creates a new column by applying a row-wise operation to one or more existing columns. For example, suppose we have two columns price and volume, where 

price 20.50 18.02 7.59 10.05 ...
volume 10 12 20 15 ...

and we define the data-parallel map operation 

total = price * volume

then the new column will be 

total 205.00 216.24 151.80 150.75 ...

Window

The time window primitive partitions the stream into disjoint contiguous substreams of a fixed time duration. In the HBlocks model, every row has a unique (timestamp,index) pair and the rows are uniquely linearly ordered by that pair of values. Timestamps increase (but do not necessarily strictly increase) throughout the stream. Index values are used to define the unique order of rows that share the same timestamp. Suppose you have the stream

timestamp 12:00 12:01 12:05 12:10 12:13 12:13 12:17 ...
volume 10 12 20 15 8 14 21 ...

and you partition it with five minute windows aligned on 12:00, then the stream starts with the four substreams shown 

timestamp 12:00 12:01 12:05 12:10 12:13 12:13 12:17 ...
volume 10 12 20 15 8 14 21 ...

A more general form of the window primitive is the "punctuation window". Given a column of boolean values, each time the value is true, a new substream is started, and the previous one is closed. For example, given the columns

punc false true false false true true false ...
volume 10 12 20 15 8 14 21 ...

a punctuation window on the column punc would give

punc false true false false true true false ...
volume 10 12 20 15 8 14 21 ...

Punctuation windows allow streams to be partitioned not only by time, but by other parameters such as size, e.g. windows of 50,000 rows. Another punctuation window example is where a new window is started as soon as an important event is detected.

Time and punctuation windows allow window functions to be used in the HBlocks model. For example, if we have the following stream with the time window shown

timestamp 12:00 12:01 12:05 12:10 12:13 12:13 12:17 ...
volume 10 12 20 15 8 14 21 ...

then we can create a new column using the "window_sum" function

timestamp 12:00 12:01 12:05 12:10 12:13 12:13 12:17 ...
volume 10 12 20 15 8 14 21 ...
sum 22 22 20 37 37 37 21+ ...

Implementations of the HBlocks model, such as Pinpoint, contain many powerful window functions for reduction, ranking, sorting etc. Since windows are disjoint, the computation of window functions on all the windows in a stream can be performed in parallel.

Select

Given a column of boolean values, the select primitive discards all rows for which the value is false. For example, given the columns

filter false true false false true true false ...
volume 10 12 20 15 8 14 21 ...

a select on the column filter would give

filter true true true ...
volume 12 8 14 ...

Project

Given a set S of columns, the project primitive discards all columns other than those in S. 

Previous

The previous primitive creates a new column by applying a row-shifting operation to the selected column. For example, suppose we have 

price 20.50 18.02 7.59 10.05 ...

and we define the data-parallel previous operation 

last_one = previous(price)

then the new column will be 

last_one default 20.50 18.02 7.59 ...

where default is the default value for that type of column. Note that in the HBlocks model, circular definitions are invalid, so defining an operation such as 

price = previous(price) + 10.8

is not allowed. Multiple definitions containing a circularity are also prohibited, for example

last_one = previous(price)

price = last_one + 7

These kinds of iterative definitions can, however be specified using the scan primitive, see below.

Group

The group primitive partitions the stream into disjoint substreams based on the values in the "groupby" column. A separate substream is created for each unique value in the groupby column. For example, suppose we have

price 20.50 18.02 7.59 10.05 17.98 20.48 10.02 17.96 20.48 ...
volume 10 12 20 15 12 25 12 15 22 ...
symbol ASDE DRVC DFSW KFCR DRVC ASDE KFCR DRVC ASDE ...

and we group by symbol, then we have 

price 20.50 20.48 20.48 ...
volume 10 25 22 ...
symbol ASDE ASDE ASDE ...

 

price 18.02 17.98 17.96 ...
volume 12 12 15 ...
symbol DRVC DRVC DRVC ...

 

price 7.59 ...
volume 20 ...
symbol DFSW ...

 

price 10.05 10.02 ...
volume 15 12 ...
symbol KFCR KFCR ...

Since groups are disjoint, the computation of functions on all the groups in a stream can be performed in parallel.

Scan

The scan primitive creates a new column by applying an iterative operation to the selected column. For example, suppose we have 

price 20.50 18.02 7.59 10.05 ...

and we define the data-parallel scan operation 

prefix_sum = scan( + , price )

then the new column will be 

prefix_sum 20.50 38.52 46.11 56.16 ...

Similarly, if we have the windowed stream

timestamp 12:00 12:01 12:05 12:10 12:13 12:13 12:17 ...
volume 10 12 20 15 8 14 21 ...

and we define the data-parallel scan operation 

prefix_min = scan( min , volume )

then the new stream will be 

timestamp 12:00 12:01 12:05 12:10 12:13 12:13 12:17 ...
volume 10 12 20 15 8 14 21 ...
prefix_min 10 10 20 15 8 8 21-

Implementations of the HBlocks model, such as Pinpoint, have a number of these simple scan functions built-in (prefix_sum, prefix_min etc.) as well as the general mechanism for user-defined scan functions.

Reduce

As noted above, the HBlocks model provides several ways of performing a reduce operation, such as window_sum and prefix_sum on columns. There is therefore no need for a separate reduce primitiveSince windows and groups are disjoint, the computation of reductions on all the windows or groups in a stream can be performed in parallel.