Re: Emit intermediate accumulator state of a session window

2019-12-09 Thread chandu soa
Thank you all for your responses.

I've created a custom trigger similar to flink provided EventTimeTrigger,
with few changes. Fire event on onElement(), and do not fire event on
onEventTime() to satisfy my requirement - whenever new event arrives fire
incremental result(result of AggregateFunction#add()) immediately. Find
below changed code block.

@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow
window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.*FIRE*; // instead of CONTINUE
}
}

@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.*CONTINUE* : // instead of FIRE
TriggerResult.CONTINUE;
}

Thanks,
Chandu


On Sun, Dec 8, 2019 at 8:22 AM Rafi Aroch  wrote:

> Hi Chandu,
>
> Maybe you can use a custom trigger:
> * .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*
>
> This would continuously trigger your aggregate every period of time.
>
> Thanks,
> Rafi
>
>
> On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin 
> wrote:
>
>> Hi Chandu,
>>
>> I am not sure whether using the windowing API is helpful in this case at
>> all.
>>
>> At least, you could try to consume the data not only by windowing but
>> also by a custom stateful function.
>> You look into the AggregatingState [1]. Then you could do whatever you
>> want with the current aggregated value.
>> If you still need to do something with the result of windowing, you could
>> do it as now or simulate it with timers [2] in that same stateful function.
>>
>> Best,
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>>
>> On Tue, Dec 3, 2019 at 12:21 AM chandu soa  wrote:
>>
>>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>>> session window when new event arrives*
>>>
>>>
>>>
>>> AggregateFunction#getResults() is called only when window completes. My
>>> need is emit intermediate accumulator values(result of
>>> AggregateFunction#add()) as well and write them to Sink. Both
>>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>>> aggregated result, only when the window is closed.
>>>
>>> *Any thoughts please, how to emit or stream intermediate accumulator
>>> state as soon as new event arrive when window is open? Need to implement
>>> custom trigger or Assigner?*
>>>
>>>
>>>
>>> To give you some background, when user watches a video we get events -
>>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>>> video.
>>>
>>> I need to aggregate them as soon as they arrive and post it to
>>> destination. For example, if user watching a two-hour movie I get events
>>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>>> 25%,...,100%). The below implementation emitting(getResult()) a single
>>> event 20 minutes after watching a video.
>>>
>>>
>>>
>>>
>>>
>>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>>
>>>
>>> .aggregate(new EventAggregator())
>>>
>>>
>>> .filter(new FinalFilter())
>>>
>>>
>>> .addSink(...)
>>>
>>>
>>> Appreciate your help.
>>>
>>>
>>> Thanks,
>>>
>>> chandu
>>>
>>


Re: Emit intermediate accumulator state of a session window

2019-12-08 Thread Rafi Aroch
Hi Chandu,

Maybe you can use a custom trigger:
* .trigger(**ContinuousEventTimeTrigger.of(Time.minutes(15)))*

This would continuously trigger your aggregate every period of time.

Thanks,
Rafi


On Thu, Dec 5, 2019 at 1:09 PM Andrey Zagrebin  wrote:

> Hi Chandu,
>
> I am not sure whether using the windowing API is helpful in this case at
> all.
>
> At least, you could try to consume the data not only by windowing but also
> by a custom stateful function.
> You look into the AggregatingState [1]. Then you could do whatever you
> want with the current aggregated value.
> If you still need to do something with the result of windowing, you could
> do it as now or simulate it with timers [2] in that same stateful function.
>
> Best,
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example
>
> On Tue, Dec 3, 2019 at 12:21 AM chandu soa  wrote:
>
>> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
>> session window when new event arrives*
>>
>>
>>
>> AggregateFunction#getResults() is called only when window completes. My
>> need is emit intermediate accumulator values(result of
>> AggregateFunction#add()) as well and write them to Sink. Both
>> AggregateFunction#getResult() and ProcessWindowFunction() provides
>> aggregated result, only when the window is closed.
>>
>> *Any thoughts please, how to emit or stream intermediate accumulator
>> state as soon as new event arrive when window is open? Need to implement
>> custom trigger or Assigner?*
>>
>>
>>
>> To give you some background, when user watches a video we get events -
>> when clicked, thereafter every ~ 15minutes, and finally when user close the
>> video.
>>
>> I need to aggregate them as soon as they arrive and post it to
>> destination. For example, if user watching a two-hour movie I get events
>> for 15 min interval(0,15,30,...,120), whenever I get a event need to
>> aggregate watched percentage so far and write it to sink(0%, 12.5%,
>> 25%,...,100%). The below implementation emitting(getResult()) a single
>> event 20 minutes after watching a video.
>>
>>
>>
>>
>>
>> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>>
>>
>> .aggregate(new EventAggregator())
>>
>>
>> .filter(new FinalFilter())
>>
>>
>> .addSink(...)
>>
>>
>> Appreciate your help.
>>
>>
>> Thanks,
>>
>> chandu
>>
>


Re: Emit intermediate accumulator state of a session window

2019-12-05 Thread Andrey Zagrebin
Hi Chandu,

I am not sure whether using the windowing API is helpful in this case at
all.

At least, you could try to consume the data not only by windowing but also
by a custom stateful function.
You look into the AggregatingState [1]. Then you could do whatever you want
with the current aggregated value.
If you still need to do something with the result of windowing, you could
do it as now or simulate it with timers [2] in that same stateful function.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#example

On Tue, Dec 3, 2019 at 12:21 AM chandu soa  wrote:

> *Emit intermediate accumulator(AggregateFunction ACC value) state of a
> session window when new event arrives*
>
>
>
> AggregateFunction#getResults() is called only when window completes. My
> need is emit intermediate accumulator values(result of
> AggregateFunction#add()) as well and write them to Sink. Both
> AggregateFunction#getResult() and ProcessWindowFunction() provides
> aggregated result, only when the window is closed.
>
> *Any thoughts please, how to emit or stream intermediate accumulator state
> as soon as new event arrive when window is open? Need to implement custom
> trigger or Assigner?*
>
>
>
> To give you some background, when user watches a video we get events -
> when clicked, thereafter every ~ 15minutes, and finally when user close the
> video.
>
> I need to aggregate them as soon as they arrive and post it to
> destination. For example, if user watching a two-hour movie I get events
> for 15 min interval(0,15,30,...,120), whenever I get a event need to
> aggregate watched percentage so far and write it to sink(0%, 12.5%,
> 25%,...,100%). The below implementation emitting(getResult()) a single
> event 20 minutes after watching a video.
>
>
>
>
>
> .window(*EventTimeSessionWindows.withGap(Time.minutes(20))*)
>
>
> .aggregate(new EventAggregator())
>
>
> .filter(new FinalFilter())
>
>
> .addSink(...)
>
>
> Appreciate your help.
>
>
> Thanks,
>
> chandu
>