Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Fabian Hueske
This works with event-time as well. You need to set the right TimeCharacteristics on the exec env and assign timestamps + watermarks. The only time depended operation is the window. YourWindowFunction assigns the timestamp of the window. WindowFunction.apply() has a TimeWindow parameter that gives

Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Simone Robutti
I'm working with your suggestions, thank you very much. What I'm missing here is what YourWindowFunction should do. I have no notion of event time there and so I can't assign a timestamp. Also this solution seems to be working by processing time, while I care about event time. I couldn't make it

Re: Counting latest state of stateful entities in streaming

2016-09-30 Thread Fabian Hueske
Hi Simone, I think I have a solution for your problem: val s: DataStream[(Long, Int, ts)] = ??? // (id, state, time) val stateChanges: DataStream[(Int, Int)] = s // (state, cntUpdate) .keyBy(_._1) // key by id .flatMap(new StateUpdater) // StateUpdater is a stateful FlatMapFunction. It has