Hi Chandu, Maybe you can use a custom trigger: * .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*
This would continuously trigger your aggregate every period of time. Thanks, Rafi On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin <azagre...@apache.org> wrote: > Hi Chandu, > > I am not sure whether using the windowing API is helpful in this case at > all. > > At least, you could try to consume the data not only by windowing but also > by a custom stateful function. > You look into the AggregatingState [1]. Then you could do whatever you > want with the current aggregated value. > If you still need to do something with the result of windowing, you could > do it as now or simulate it with timers [2] in that same stateful function. > > Best, > Andrey > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example > > On Tue, Dec 3, 2019 at 12:21 AM chandu soa <chandu...@gmail.com> wrote: > >> *Emit intermediate accumulator(AggregateFunction ACC value) state of a >> session window when new event arrives* >> >> >> >> AggregateFunction#getResults() is called only when window completes. My >> need is emit intermediate accumulator values(result of >> AggregateFunction#add()) as well and write them to Sink. Both >> AggregateFunction#getResult() and ProcessWindowFunction() provides >> aggregated result, only when the window is closed. >> >> *Any thoughts please, how to emit or stream intermediate accumulator >> state as soon as new event arrive when window is open? Need to implement >> custom trigger or Assigner?* >> >> >> >> To give you some background, when user watches a video we get events - >> when clicked, thereafter every ~ 15minutes, and finally when user close the >> video. >> >> I need to aggregate them as soon as they arrive and post it to >> destination. For example, if user watching a two-hour movie I get events >> for 15 min interval(0,15,30,...,120), whenever I get a event need to >> aggregate watched percentage so far and write it to sink(0%, 12.5%, >> 25%,...,100%). The below implementation emitting(getResult()) a single >> event 20 minutes after watching a video. >> >> >> >> >> >> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*) >> >> >> .aggregate(new EventAggregator()) >> >> >> .filter(new FinalFilter()) >> >> >> .addSink(...) >> >> >> Appreciate your help. >> >> >> Thanks, >> >> chandu >> >