Thank you all for your responses. I've created a custom trigger similar to flink provided EventTimeTrigger, with few changes. Fire event on onElement(), and do not fire event on onEventTime() to satisfy my requirement - whenever new event arrives fire incremental result(result of AggregateFunction#add()) immediately. Find below changed code block.
@Override public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception { if (window.maxTimestamp() <= ctx.getCurrentWatermark()) { return TriggerResult.FIRE; } else { ctx.registerEventTimeTimer(window.maxTimestamp()); return TriggerResult.*FIRE*; // instead of CONTINUE } } @Override public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) { return time == window.maxTimestamp() ? TriggerResult.*CONTINUE* : // instead of FIRE TriggerResult.CONTINUE; } Thanks, Chandu On Sun, Dec 8, 2019 at 8:22 AM Rafi Aroch <rafi.ar...@gmail.com> wrote: > Hi Chandu, > > Maybe you can use a custom trigger: > * .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))* > > This would continuously trigger your aggregate every period of time. > > Thanks, > Rafi > > > On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin <azagre...@apache.org> > wrote: > >> Hi Chandu, >> >> I am not sure whether using the windowing API is helpful in this case at >> all. >> >> At least, you could try to consume the data not only by windowing but >> also by a custom stateful function. >> You look into the AggregatingState [1]. Then you could do whatever you >> want with the current aggregated value. >> If you still need to do something with the result of windowing, you could >> do it as now or simulate it with timers [2] in that same stateful function. >> >> Best, >> Andrey >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example >> >> On Tue, Dec 3, 2019 at 12:21 AM chandu soa <chandu...@gmail.com> wrote: >> >>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a >>> session window when new event arrives* >>> >>> >>> >>> AggregateFunction#getResults() is called only when window completes. My >>> need is emit intermediate accumulator values(result of >>> AggregateFunction#add()) as well and write them to Sink. Both >>> AggregateFunction#getResult() and ProcessWindowFunction() provides >>> aggregated result, only when the window is closed. >>> >>> *Any thoughts please, how to emit or stream intermediate accumulator >>> state as soon as new event arrive when window is open? Need to implement >>> custom trigger or Assigner?* >>> >>> >>> >>> To give you some background, when user watches a video we get events - >>> when clicked, thereafter every ~ 15minutes, and finally when user close the >>> video. >>> >>> I need to aggregate them as soon as they arrive and post it to >>> destination. For example, if user watching a two-hour movie I get events >>> for 15 min interval(0,15,30,...,120), whenever I get a event need to >>> aggregate watched percentage so far and write it to sink(0%, 12.5%, >>> 25%,...,100%). The below implementation emitting(getResult()) a single >>> event 20 minutes after watching a video. >>> >>> >>> >>> >>> >>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*) >>> >>> >>> .aggregate(new EventAggregator()) >>> >>> >>> .filter(new FinalFilter()) >>> >>> >>> .addSink(...) >>> >>> >>> Appreciate your help. >>> >>> >>> Thanks, >>> >>> chandu >>> >>