Thank you all for your responses.

I've created a custom trigger similar to flink provided EventTimeTrigger,
with few changes. Fire event on onElement(), and do not fire event on
onEventTime() to satisfy my requirement - whenever new event arrives fire
incremental result(result of AggregateFunction#add()) immediately. Find
below changed code block.

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow
window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.*FIRE*; // instead of CONTINUE
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.*CONTINUE* : // instead of FIRE
TriggerResult.CONTINUE;
}

Thanks,
Chandu


On Sun, Dec 8, 2019 at 8:22 AM Rafi Aroch <rafi.ar...@gmail.com> wrote:

> Hi Chandu,
>
> Maybe you can use a custom trigger:
> *     .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*
>
> This would continuously trigger your aggregate every period of time.
>
> Thanks,
> Rafi
>
>
> On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin <azagre...@apache.org>
> wrote:
>
>> Hi Chandu,
>>
>> I am not sure whether using the windowing API is helpful in this case at
>> all.
>>
>> At least, you could try to consume the data not only by windowing but
>> also by a custom stateful function.
>> You look into the AggregatingState [1]. Then you could do whatever you
>> want with the current aggregated value.
>> If you still need to do something with the result of windowing, you could
>> do it as now or simulate it with timers [2] in that same stateful function.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>>
>> On Tue, Dec 3, 2019 at 12:21 AM chandu soa <chandu...@gmail.com> wrote:
>>
>>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>>> session window when new event arrives*
>>>
>>>
>>>
>>> AggregateFunction#getResults() is called only when window completes. My
>>> need is emit intermediate accumulator values(result of
>>> AggregateFunction#add()) as well and write them to Sink. Both
>>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>>> aggregated result, only when the window is closed.
>>>
>>> *Any thoughts please, how to emit or stream intermediate accumulator
>>> state as soon as new event arrive when window is open? Need to implement
>>> custom trigger or Assigner?*
>>>
>>>
>>>
>>> To give you some background, when user watches a video we get events -
>>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>>> video.
>>>
>>> I need to aggregate them as soon as they arrive and post it to
>>> destination. For example, if user watching a two-hour movie I get events
>>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>>> 25%,...,100%). The below implementation emitting(getResult()) a single
>>> event 20 minutes after watching a video.
>>>
>>>
>>>
>>>
>>>
>>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>>
>>>
>>> .aggregate(new EventAggregator())
>>>
>>>
>>> .filter(new FinalFilter())
>>>
>>>
>>> .addSink(...)
>>>
>>>
>>> Appreciate your help.
>>>
>>>
>>> Thanks,
>>>
>>> chandu
>>>
>>

Reply via email to