Hi all,

I’m currently calculating a moving average with DataStreams via:

            .keyBy(new XXXKeySelector())
            .window(GlobalWindows.create())
            .trigger(CountTrigger.of(1))
            .aggregate(new MovingAverageAggregator(10))

MovingAverageAggregator uses a MovingAverageAccumulator to do all of the real 
work.

But if I want to use AggregatingState to support checkpointing, it seems like 
I’d have to create a new RichFunction (e.g. MovingAverageFunction) that 
implements Checkpointing, creates the AggregatingState in its initializeState() 
method, and then the stream creation is:

            .keyBy(new XXXKeySelector())
            .map(new MovingAverageFunction(10))

Yes? Seems kind of…awkward to have a function with state that is created using 
a descriptor that takes an aggregator which has an accumulator.

Also, is the state in this case the serialized accumulator object?

Thanks,

— Ken

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
Custom big data solutions & training
Flink, Solr, Hadoop, Cascading & Cassandra

Reply via email to