Jark Wu created FLINK-21109:
-------------------------------

             Summary: Introduce "retractAccumulators" interface for 
AggregateFunction in Table/SQL API
                 Key: FLINK-21109
                 URL: https://issues.apache.org/jira/browse/FLINK-21109
             Project: Flink
          Issue Type: New Feature
          Components: Table SQL / API
            Reporter: Jark Wu


*Motivation*

The motivation is to improve the performance of hopping (sliding) windows.
Currently, we have paned (or called sliced) optimization for the hopping 
windows in Table/SQL. 
That each element will be accumulated into a single pane. And once a window is 
fired,
we will merge multiple panes to get the window result. 

For example, HOP(size=10s, slide=2s), a window [0, 10) consists of 5 panes [0, 
2), [2, 4), [4, 6), [6, 8), [8, 10).
And each element will fall into a single pane, e.g. element with timestamp 3 
will fall into pane [2, 4).

However, currently, the merging panes happen on JVM heap memory. For example, 
when window [0, 10) is going to be fired,
we will retrieve the accumulators of the 5 panes and merge them into an 
in-memory accumulator. 
The performance is not good, because the number of panes may be very large when 
the slide is small, e.g. 8640 panes when HOP(1day, 10s).
And the memory may OOM when the accumulator is very large, e.g. containing 
count distinct. 

Thus, I would like to introduce a "retractAccumulators()" method which is an 
inverse method of "merge()".
With the "retractAccumulators()" method, we can reduce the time complexity from 
O(N) to O(1).
For example, when window [10, 20) is going to be fired, then we only need to 
retract accumulator of pane [8, 10) 
and merge the accumulator of pane [18, 20) into the state of the last window 
[8, 18). 

This will be a great performance improvement to make the hopping window have 
similar performance 
with the tumbling window, no matter how small the slide is. 

*Public Interface*

We will introduce a contract method "retractAccumulators" which is similar to 
the "merge" method.


{code}
Retracts a group of accumulator instances from one accumulator instance. This 
method is optional, 
but implementing this method can greatly improve the performance of hopping 
window aggregates.
Therefore, it is recommended to implement this method when using with hopping 
windows. 

param: accumulator the accumulator which will keep the retracted aggregate 
results. It should
                   be noted that the accumulator may contain the previous 
aggregated
                   results. Therefore users should not replace or clean this 
instance in the
                   custom retractAccumulators method.
param: retractAccs an java.lang.Iterable pointed to a group of accumulators 
that will be
                   retracted.

public void retractAccumulators(ACC accumulator, java.lang.Iterable<ACC> 
retractAccs)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to