Rishabh Maurya created FLINK-29186:
--------------------------------------

             Summary: Store summarized results in internal nodes of BKD for 
time series points
                 Key: FLINK-29186
                 URL: https://issues.apache.org/jira/browse/FLINK-29186
             Project: Flink
          Issue Type: New Feature
          Components: API / Core
            Reporter: Rishabh Maurya


Time series points have a timestamp, measurement and dimensions associated with 
them. The common queries are range queries on timestamp, metric aggregation on 
measurement and grouping on dimensions. Or similar query with histogram on 
timestamp. 

*Proposal:*

Prototype can be found [here|https://github.com/rishabhmaurya/lucene/pull/1]

1. Introduce a new time series point as a field in lucene - `TSPoint` which can 
be added as - 

```

Document doc = new Document(); doc.add(new TSIntPoint("tsid1", "cpu", 
timestamp, measurement));

``` 

`tsid1` is the time series ID of the point. It will be the unit of storage for 
time series points and for prototype each of them represents a unique field in 
lucene. 

`timestamp` is the actual point in BKD on which the index is created. 

Full definition here can be found 
[here.|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/document/TSIntPoint.java]

2. Interface for decomposable aggregate function, which can be defined as part 
of index configuration - 

Sum function - 

```java

new BKDSummaryWriter.SummaryMergeFunction<Integer>() {

      @Override
      public int getSummarySize() {
        return Integer.BYTES;
      }

      @Override
      public void merge(byte[] a, byte[] b, byte[] c) {
        packBytes(unpackBytes(a) + unpackBytes(b), c);
      }

      @Override
      public Integer unpackBytes(byte[] val) {
        return NumericUtils.sortableBytesToInt(val, 0);
      }

      @Override
      public void packBytes(Integer val, byte[] res) {
        NumericUtils.intToSortableBytes(val, res, 0);
      }
    };

```

 

3. New query per `LeafReader` to perform range queries on TSPoint and retrieve 
summarized results - 

 

```

LeafReader leafReader; PointValues points = leafReader.getPointValues("tsid1");

TSPointQuery tsPointQuery = new TSPointQuery("tsid1", lowerBoundTimestamp, 
upperBoundTimestamp);

byte[] res = tsPointQuery.getSummary((BKDWithSummaryReader.BKDSummaryTree) 
points.getPointTree(), mergeFunction);

```

Instead of BKDReader and BKDWriter, we will be using 
[BKDSummaryWriter|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/util/bkd/BKDSummaryWriter.java]
 
[BKDSummaryReader|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/util/bkd/BKDWithSummaryReader.java]
 which supports writing summaries with internal nodes of the tree. 

Changes in IntersectVisitor interface 
[here.|https://github.com/rishabhmaurya/lucene/blob/0c07f677c4caf45b1499f236adc0217992e46722/lucene/core/src/java/org/apache/lucene/index/PointValues.java#L320-L337]
h4. Comparison with DocValues

Below is the comparison of running unit test for 
[DocValue|https://github.com/rishabhmaurya/lucene/pull/1/commits/157215cd2c4787787748625e10b58001583c6e6e#diff-87c31ac3b1cd1ef45b15c84d9cf1c5bab03feb94f199256f859f14ed4747abd2R129]
 approach vs 
[TSPoint|https://github.com/rishabhmaurya/lucene/pull/1/commits/157215cd2c4787787748625e10b58001583c6e6e#diff-87c31ac3b1cd1ef45b15c84d9cf1c5bab03feb94f199256f859f14ed4747abd2R52]
 approach -

This test ingests {{10000000}} docs against a given TSID and performs a range 
query on timestamp 100 times against the same TSID. Merge function used is 
{{{}sum{}}}.
||DocValues approach||TSPoint approach||
|Indexing took: 42948 ms|Indexing took: 32985|
|Matching docs count:1304624 \| Segments:3 \| DiskAccess: 1304624|Matching docs 
count:8784032 \| Segments:10 \| DiskAccess: 302|
|Search took: 12382 ms|Search took: 50ms|

This is not apple to apple comparison since number of segments are 3 in 
DocValues approach whereas its 10 in TSPoint approach.
h4. Limitation of this feature
 * Doc deletion currently not supported. We need to evaluate how important is 
it and possibly find a way to support it in future.
 * Only 
[decomposable|https://en.wikipedia.org/wiki/Aggregate_function#Decomposable_aggregate_functions]
 aggregation functions can be supported. E.g. min, max, sum, avg, count.
 * Range query will only be supported on {{{}timestamp{}}}.

h4. TODOs
 * Implementation for multiple TSIDs. For now we need to create a new field 
with the name same as TSID for a timeseries.
 * Segment merge for BKD with summaries. Currently, the UTs disables merge and 
perform search across multiple segments and cumulate the results.
 * Pluggable merge function to merge 2 {{{}TSPoint{}}}. Currently its hardcoded 
in {{FieldInfo.java}} which isn't the right place to define them.
 * Measurement compression in BKD. I'm thinking of using delta encoding to 
store measurement values and summaries while packing the summaries associated 
with nodes of the tree.
 * Persist first and last docID in internal nodes of BKD with summaries in an 
efficient way. This will be useful to use precomputed summaries and skip over 
batches of documents when iterating using DocIDSetIterator.
 * Benchmark against real timeseries dataset.
 ** compare against SortedDocValues approach.
 ** compare against other timeseries databases.
 * Evaluate support of deletion of document/timeseries/batch of documents 
(matching a timestamp range).

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to