I have a simple Aggregation with one caveat. For some reason I have to keep
a large amount of state till the window is GCed. The state is within the
Accumulator ( ACC ). I am hitting a memory bottleneck and would like to
offload the state  to the states backend ( ROCKSDB), keeping the between
checkpoint state in memory ( seems to be an obvious fix). I am not though
allowed to have a RichAggregateFunction in the aggregate method of a
windowed stream . That begs 2 questions

1. Why
2. Is there an alternative for stateful window aggregation where we manage
the state. ?

Thanks Vishal


Here is the code ( generics but it works  )

SingleOutputStreamOperator<OUT> retVal = input
        .keyBy(keySelector)
        .window(EventTimeSessionWindows.withGap(gap))
        .aggregate(
                new AggregateFunction<IN, ACC, OUT>() {

                    @Override
                    public ACC createAccumulator() {
                        ACC newInstance = (ACC) accumulator.clone();
                        newInstance.resetLocal();
                        return newInstance;
                    }

                    @Override
                    public void add(IN value, ACC accumulator) {
                        accumulator.add(value);

                    }

                    @Override
                    public OUT getResult(ACC accumulator) {
                        return accumulator.getLocalValue();
                    }

                    @Override
                    public ACC merge(ACC a, ACC b) {
                        a.merge(b);
                        return a;
                    }
                }, new WindowFunction<OUT, OUT, KEY, TimeWindow>() {
                    @Override
                    public void apply(KEY s, TimeWindow window,
Iterable<OUT> input, Collector<OUT> out) throws Exception {
                            out.collect(input.iterator().next());
                    }
                }, accType, aggregationResultType, aggregationResultType);

Reply via email to