> public AverageAccumulator merge(WindowStats a, WindowStats b) {
> should be:
> public WindowStats merge(WindowStats a, WindowStats b) {
> I see. Then yes, a fold operation would be more efficient here.
> However, can you give an idea on how to use aggregateFunction in latest
> flink to replace the following fold function?
> Sure! The documentation for 1.3 is still a bit lagging behind for some of
> the new features, but the Javadoc for `AggregateFunction` should be rather
> self-explaining.
> As a quick sketch, here’s what you would do to achieve the same thing:
> public class WindowStatsAggregator implements AggregateFunction<IN, 
> WindowStats, OUT> {
>     public WindowStats createAccumulator() {
>         return new WindowStats();
>     }
>     public AverageAccumulator merge(WindowStats a, WindowStats b) {
>         // merge the two unique products map in your WindowStats
>     }
>     public void add(IN value, WindowStats acc) {
>         // update your unique products map
>     }
>     public OUT getResult(WindowStats acc) {
>         return acc.getMap();
>     }
> }
> As you can see, the `AggregateFunction` is more generic, and should
> subsume whatever you were previously doing with fold.
> Your previous `WindowStats` class is basically the state accumulator, and
> you need to implement how to update it, merge two accumulators, and
> retrieve the final accumulated result.
> For more info, I would point to the class Javadocs of `AggregateFunction`.
> Thanks for the details. I am using fold to process events and maintain
> statistics per each product ID within WindowStats instance. So fold is much
> efficient because events can be in millions but unique products will be
> less than 50k. However, if i use generic window function, It will be less
> efficient because window function will receive a collection of millions of
> events and they will be replicated for each sliding window as Flink
> replicate events for each sliding window.
> However, can you give an idea on how to use aggregateFunction in latest
> flink to replace the following fold function?
> final DataStream<WindowStats> eventStream = inputStream
> .window(SlidingProcessingTimeWindows.of(Time.hour(1,Time.minute(5)))
> *.fold(new WindowStats(),* newProductAggregationMapper(),
> newProductAggregationWindowFunction());
>> Yes, that is correct. The aggregated fold value (i.e. your WindowStats
>> instance) will be checkpointed by Flink as managed state, and restored from
>> the last complete checkpoint in case of failures.
>> One comment on using the fold function: if what you’re essentially doing
>> in the fold is just collecting the elements of the windows per key and
>> performing the actual aggregation in the window function, then you don't
>> need the fold.
>> A generic window function should suit that case. See [1].
>> [1]
>> 1.3/dev/windows.html#windowfunction---the-generic-case
>> Hi All,
>> I am collecting millions of events per 24hour for 'N' number of products
>> where 'N' can be 50k. I use the following fold mechanism with sliding
>> window:
>> final DataStream<WindowStats> eventStream = inputStream
>> .window(SlidingProcessingTimeWindows.of(Time.hour(24,Time.minute(5)))
>> *.fold(new WindowStats(),* newProductAggregationMapper(),
>> newProductAggregationWindowFunction());
>> In WindowStats class, I keep a map of HashMap<String productID,
>> ProductMetric ProductMetric>> which keeps products event count and other
>> various metrics. So for 50k products I will have 50k entries in the map
>> within WindowStats instance instead of millions of Events as fold
>> function will process them as the event arrives.
>> My question is, if I set (env.enableCheckpointing(1000)), then the 
>> WindowStats
>> instance for each existing window will automatically be checkpointed and
>> restored on recovery? If not then how can I better a implement above
>> usecase to store the state of WindowStats object within fold operation
>> please?
