Pinpoint

Pinpoint is the internal language used for analyzing, filtering and transforming data in HRule. Its role is therefore similar to the role played by MDX in querying the multidimensional data stored in a traditional offline data warehouse, or the role played by SQL in querying an offline relational database. The Pinpoint language is based on the HBlocks model, and can be used to process live realtime streams and massive historical data sets. The HSheets user interface enables users (business users, data scientists, app developers,...) to build powerful Pinpoint agents quickly and easily by dragging and dropping building blocks. Each HSheets building block corresponds to a single Pinpoint expression.
Streams
A Pinpoint application processes one or more streams, and produces an output stream. A stream consists of a finite set of columns. For example, a market data stream might consist of three columns containing ticker symbol, price and volume information. A stream can also be considered as a finite or infinite timestamp-ordered sequence of rows, where each row contains exactly one element from each of the columns. For example, a market data stream might contain all of the NASDAQ Last Trade data from Monday 11th July 2011 at 12 noon UTC Time to Friday 2nd September 2011 at 10.30pm UTC Time. Another stream might contain every tweet sent to the Twitter site from Tuesday 15th March 2011 at 8.45am UTC Time up to the present time.
Streams might contain thousands, millions or billions of rows per hour. There is no upper limit. Continuously updated streams are essentially infinite, and continuous never-ending analytics computations can be run on them. The timestamps for each row are accurate to the microsecond level, and where two consecutive rows have identical timestamps, their ordering is precisely defined using unique row identifiers for each timestamp.
The values in a single column of a stream must be of the same type. Columns can be numeric, logical, time-related, string, or array. For example, the symbol, price and volume columns of a market data stream would typically be of type string, numeric (real), and numeric (integer) respectively.
Stream Merging
When a single application is run on more than one stream, the streams are merged or interleaved together in time-order to form a single stream, using row identifiers to resolve any ambiguity.
Window Substreams
A Pinpoint application can perform certain parts of the computation, either sequentially or concurrently, on disjoint windows of a stream instead of performing it on the stream as a whole. Windows are either time-based or event-based.
A time-based window partitions the rows of the stream into disjoint contiguous time intervals of a certain length (number of microseconds). For example, part of a Pinpoint app on a market data stream might need to partition the rows of the stream into 5-minute windows, while another part might need to simultaneously processes the same data in 40-second windows, and yet another might need to process the whole of the same stream together as a single unit in order to continuously check for a particular pattern that might appear anywhere in the stream.
An event-based window (called a "Window Frame" in Pinpoint) is more general, and allows a stream to be partitioned into disjoint contiguous intervals based on any combination of criteria. For example, a particular part of a Pinpoint app might need to process the stream in batches of one million rows, irrespective of how long those batches are in terms of time. Another part might need to process the stream in batches which end every time a particular combination of events are detected in the streams - for example, every time a large trade in a particular stock is detected and the time since the last such event is at least 200 seconds.
Group Substreams
A Pinpoint application can also perform certain parts of the computation, either sequentially or concurrently, on disjoint substreams instead of performing it on the stream as a whole. For example, part of a Pinpoint app on a NASDAQ market data stream might need to partition the rows of the stream into disjoint substreams, in which each substream contains only those rows with a particular ticker symbol. In such a case, there would be a substream for the rows containing the ticker symbol GOOG, another for the rows containing MSFT, another for ORCL etc. In total there would be more than 3200 distinct substreams being processed simultaneously, corresponding to the number of companies whose stocks are traded on NASDAQ. The number of substreams in a Pinpoint app is limited only by the available resources. An app may process a stream as a disjoint collection of thousands or millions of substreams.
Stream Partitions
Normally, a Pinpoint app produces a single output stream. However, by setting simple parameters, it can instead partition the output stream into a set of disjoint substreams that are output as separate streams. For example, a Pinpoint app processing a realtime stream of Facebook user data might output the results as ten separate streams based on the Facebook user id: one stream containing the rows for those users whose id ends with the digit 0, another for those ending in 1, and eight other streams for those ending in 2,3,4,5,6,7,8,9.
External Data Structures
A Pinpoint app performs a functional computation on the values in the columns of the stream. An app may also read values from, and write values to, external data structures outside the stream. These private data structures are local to the application. They cannot be shared across apps, and are not persistent. For example, a Pinpoint app processing a stream of events associated with Facebook users might use an external hash table to store information about the set of unique Facebook users identified so far in the event stream.
External Data Stores
A Pinpoint app can get data from, and put data to, external data stores that are persistent, and which can be shared across any number of Pinpoint apps. For example, a set of Pinpoint apps processing different eCommerce event streams in various ways might use an external persistent key-value data store to access and update information about customers and products. These persistent external data stores can be very large (multi-Terabyte or more) and are implemented as distributed, fault tolerant, highly scalable structures.
Applications
A Pinpoint application can be fully specified by giving:
- one or more input streams
- start time for the interval
- end time for the interval
- select expression (optional)
- parallel value (optional)
- set of input columns
- set of expression columns
The optional select expression defines the set of rows that should be included in the output stream. The default value is that all rows are output. In a Pinpoint app, the rows in a stream cannot be rearranged, and new rows cannot be added. The set of rows can, however, be reduced by a select expression.
The optional parallel value defines the scale of MapReduce-style parallelism that should be used in carrying out the Pinpoint computation. The default value is zero, which means that the computation should be carried out as a single task. If the value is set to n, where n>0, and the length of the interval for the computation is m minutes, then the computation is divided into k disjoint parallel subcomputations, where k=m/n. For example, if the Pinpoint app is being run to process 8 hours of data and the parallel value is set to 12, then the Pinpoint app will be run as 480/12=40 separate disjoint parallel computations, giving up to a 40x speedup.
The set of input columns defines the columns of the stream that will be used by the Pinpoint app. For example, an app running on the Twitter stream may only require two or three of the columns in that stream. The other 30+ columns in that stream will be ignored by the computation.
The set of expression columns in a Pinpoint app define the new columns that are computed when the app is run. Each expression creates a new column. The order of expressions in a Pinpoint app is not significant - the set of expressions can appear in any order. Expressions can contain the standard operators found in most programming languages - operators for arithmetic (+,-,*,/,%), boolean logic (and, or, not), and relational operators (==, !=, <, <=, >,>=). Expressions can also contain functions drawn from a library of over 200 built-in Pinpoint functions. An expression column can have an optional GroupBy and/or an optional Window associated with it (see Groups, Windows above). If it has a Window, it can either be an event-based window (Window Frame), in which case it is specified by a boolean expression, or a time-based window, in which case it is specified by giving an alignment and duration.
Example App
Continuously analyze the NASDAQ Last Trade data stream and report the 10-minute Volume Weighted Average Price (VWAP) of each of the 3,200 traded companies.
Input stream: NASDAQ Last Trade
Start Time: Monday 11th July 2011. 12 noon UTC Time
End Time: Friday 2nd September 2011. 10.30pm UTC Time
Select: first_row
Parallel: 240
Inputs: symbol, price, volume
vwap = sum(price*volume)/sum(volume) [ Window:(10mins, aligned:12noon) ; GroupBy:symbol ]
first_row = window_start [ Window:(10mins, aligned:12noon) ; GroupBy:symbol ]
Functions
- Generic: maximum, minimum
- Numeric: arithmetic, random, scientific, trigonometric
- Logical: boolean, conditional, relational
- Time: duration, timestamp, year, month, day, hour, minute, second
- String: append, empty, find_char, find_substring, insert_string, regex_match, remove_substring, replace_substring, return_char, score_lucene_query, size
- Array: find_subarray, find_value, insert_value, remove_value, return_value, set_value
- Stream: match_pattern, prefix, previous, scan
- Window: all_true, any_true, bottom_k, end, first, frequency, k_random_rows, last, length, longest_decreasing_subsequence, longest_increasing_subsequence, maximum, minimum, product, rank, reverse, shortest_path, sort, start, sum, top_k, unique_values
- External Data Structures: deque, hash_map, hash_set, tree_map, tree_set, vector
- External Data Stores: get, put
- Types: convert_type, is_finite, is_infinite, is_nan, is_zero
- Defaults: default_value, maximum_value, minimum_value, zero_value
