Hi all, I’m currently calculating a moving average with DataStreams via:
.keyBy(new XXXKeySelector()) .window(GlobalWindows.create()) .trigger(CountTrigger.of(1)) .aggregate(new MovingAverageAggregator(10)) MovingAverageAggregator uses a MovingAverageAccumulator to do all of the real work. But if I want to use AggregatingState to support checkpointing, it seems like I’d have to create a new RichFunction (e.g. MovingAverageFunction) that implements Checkpointing, creates the AggregatingState in its initializeState() method, and then the stream creation is: .keyBy(new XXXKeySelector()) .map(new MovingAverageFunction(10)) Yes? Seems kind of…awkward to have a function with state that is created using a descriptor that takes an aggregator which has an accumulator. Also, is the state in this case the serialized accumulator object? Thanks, — Ken -------------------------- Ken Krugler +1 530-210-6378 http://www.scaleunlimited.com Custom big data solutions & training Flink, Solr, Hadoop, Cascading & Cassandra