Hi Chandu,

I am not sure whether using the windowing API is helpful in this case at
all.

At least, you could try to consume the data not only by windowing but also
by a custom stateful function.
You look into the AggregatingState [1]. Then you could do whatever you want
with the current aggregated value.
If you still need to do something with the result of windowing, you could
do it as now or simulate it with timers [2] in that same stateful function.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example

On Tue, Dec 3, 2019 at 12:21 AM chandu soa <chandu...@gmail.com> wrote:

> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
> session window when new event arrives*
>
>
>
> AggregateFunction#getResults() is called only when window completes. My
> need is emit intermediate accumulator values(result of
> AggregateFunction#add()) as well and write them to Sink. Both
> AggregateFunction#getResult() and ProcessWindowFunction() provides
> aggregated result, only when the window is closed.
>
> *Any thoughts please, how to emit or stream intermediate accumulator state
> as soon as new event arrive when window is open? Need to implement custom
> trigger or Assigner?*
>
>
>
> To give you some background, when user watches a video we get events -
> when clicked, thereafter every ~ 15minutes, and finally when user close the
> video.
>
> I need to aggregate them as soon as they arrive and post it to
> destination. For example, if user watching a two-hour movie I get events
> for 15 min interval(0,15,30,...,120), whenever I get a event need to
> aggregate watched percentage so far and write it to sink(0%, 12.5%,
> 25%,...,100%). The below implementation emitting(getResult()) a single
> event 20 minutes after watching a video.
>
>
>
>
>
> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>
>
> .aggregate(new EventAggregator())
>
>
> .filter(new FinalFilter())
>
>
> .addSink(...)
>
>
> Appreciate your help.
>
>
> Thanks,
>
> chandu
>

Reply via email to