Hi Vishal,

you are right, it is not possible to use state in an AggregateFunction
because windows need to be mergeable.
An AggregateFunction knows how to merge its accumulators but merging
generic state is not possible.

I am not aware of an efficient and easy work around for this.
If you want to use the provided session window logic, you can use a
WindowFunction that performs all computations when the window is triggered.
This means that aggregations do not happen eagerly and all events for a
window are collected and held in state.
Another approach could be to implement the whole logic (incl. the session
windowing) using a ProcessFunction. This would be a major effort though.

Best,
Fabian

2017-12-06 3:52 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:

> It seems that this has to do with session windows tbat are mergeable ? I
> tried the RixhWindow function and that seems to suggest that one cannot use
> state ? Any ideas folks...
>
> On Dec 1, 2017 10:38 AM, "Vishal Santoshi" <vishal.santo...@gmail.com>
> wrote:
>
>> I have a simple Aggregation with one caveat. For some reason I have to
>> keep a large amount of state till the window is GCed. The state is within
>> the Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
>> offload the state  to the states backend ( ROCKSDB), keeping the between
>> checkpoint state in memory ( seems to be an obvious fix). I am not though
>> allowed to have a RichAggregateFunction in the aggregate method of a
>> windowed stream . That begs 2 questions
>>
>> 1. Why
>> 2. Is there an alternative for stateful window aggregation where we
>> manage the state. ?
>>
>> Thanks Vishal
>>
>>
>> Here is the code ( generics but it works  )
>>
>> SingleOutputStreamOperator<OUT> retVal = input
>>         .keyBy(keySelector)
>>         .window(EventTimeSessionWindows.withGap(gap))
>>         .aggregate(
>>                 new AggregateFunction<IN, ACC, OUT>() {
>>
>>                     @Override
>>                     public ACC createAccumulator() {
>>                         ACC newInstance = (ACC) accumulator.clone();
>>                         newInstance.resetLocal();
>>                         return newInstance;
>>                     }
>>
>>                     @Override
>>                     public void add(IN value, ACC accumulator) {
>>                         accumulator.add(value);
>>
>>                     }
>>
>>                     @Override
>>                     public OUT getResult(ACC accumulator) {
>>                         return accumulator.getLocalValue();
>>                     }
>>
>>                     @Override
>>                     public ACC merge(ACC a, ACC b) {
>>                         a.merge(b);
>>                         return a;
>>                     }
>>                 }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
>>                     @Override
>>                     public void apply(KEY s, TimeWindow window, 
>> Iterable<OUT> input, Collector<OUT> out) throws Exception {
>>                             out.collect(input.iterator().next());
>>                     }
>>                 }, accType, aggregationResultType, aggregationResultType);
>>
>>

Reply via email to