Re: Flink custom trigger use case

2021-02-25 Thread Roman Khachatryan
Hi,

Yes, you have an Iterable with window elements as the ProcessWindowFunction
input. You can then emit them individually.

Regards,
Roman


On Thu, Feb 25, 2021 at 7:22 AM Diwakar Jha  wrote:

> Hello,
>
> I tried using *processWindowFunction* since it gives access to
> *globalstate* through *context*. My question is, Is it possible to
> discard single events inside *process* function of *processWindowFunction*
> just like *onElements* of triggers?
> For my use case it seems that trigger is not sufficient but i want to know
> how i can do it using processWindowFunction. Appreciate any pointers.
>
> Thanks!
>
> On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha 
> wrote:
>
>> Hi Arvid,
>>
>> Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced
>> duplicates though the result is still the same i.e record 1 is fired both
>> at the start and the end of the window. so for every window i see the first
>> event of the window is coming twice in the output.
>>
>> I'm trying to explain again the desired behaviour, hopefully it becomes
>> clear.
>>
>> all the records have the same key.
>> current output.
>>
>>> record 1 : first event in the window-1 : fired
>>> record 2 : last event in the window-1 : fired
>>> record 3 : first event in the window-2 : fired. [this should not have
>>> fired since it has the same Key as all other records.]
>>> record 4, record 5 : - 2 events in the window-2 : fired.
>>>
>>
>> expected output.
>>
>>> record 1 : first event in the window-1 : fired
>>> record 2 : last event in the window-1 : fired
>>> record 3,4,5 : all event in the window-2 : fired
>>
>>
>> I think my problem is to store KeyBy values between windows. For example,
>> I want to retain the KeyBy for 1 day. In that case, record 1 is fired
>> instantly, all other records (of same key as record1) are always grouped in
>> each window (say 1 min) instead of firing instantly.
>>
>> Thanks!
>>
>> On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise  wrote:
>>
>>> Hi Diwakar,
>>>
>>> the issue is that you fire_and_purge the state, you should just FIRE on
>>> the first element (or else you lose the information that you received the
>>> element already).
>>> You'd use FIRE_AND_PURGE on the last element though.
>>>
>>> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
 Hi Diwakar,

 I'm not sure I fully understand your question.
 If event handling in one window depends on some other windows than
 TriggerContext.getPartitionedState can not be used. Triggers don't have
 access to the global state (only to key-window scoped state).
 If that's what you want then please consider ProcessWindowFunction [1]
 where you can use context.globalState() in your process function.

 [1]

 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction

 Regards,
 Roman


 On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
 wrote:

>
> Hello,
>
> I'm trying to use a custom trigger for one of my use case. I have a
> basic logic (as shown below) of using keyBy on the input stream and using 
> a
> window of 1 min.
>
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(new CustomTrigger())
> .aggregate(Input.getAggregationFunction(), new
> AggregationProcessingWindow());
>
>
> My custom trigger is expected to fire the first event of the keyBy
> instantly and any subsequent events should be aggregated in the window.
>
> .trigger(new Trigger() {
>> @Override
>> public TriggerResult onElement(Record record, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> ValueState firstSeen =
>> triggerContext.getPartitionedState(firstSceenDescriptor);
>> if(firstSeen.value() == null) {
>> firstSeen.update(true);
>> // fire trigger to early evaluate window and purge that event.
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> // Continue. Do not evaluate window per element
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> // final evaluation and purge window state
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> @Override
>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public void clear(TimeWindow timeWindow, TriggerContext
>> triggerContext) throws Exception {
>>
>> }
>> })
>
>
>
>
> Currently, I see (for each window and same key) the first event of the
> window is always fired. But I want to see this happening for only the 
> first
> window and for the subsequent window 

Re: Flink custom trigger use case

2021-02-24 Thread Diwakar Jha
Hello,

I tried using *processWindowFunction* since it gives access to
*globalstate* through
*context*. My question is, Is it possible to discard single events inside
*process* function of *processWindowFunction* just like *onElements* of
triggers?
For my use case it seems that trigger is not sufficient but i want to know
how i can do it using processWindowFunction. Appreciate any pointers.

Thanks!

On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha  wrote:

> Hi Arvid,
>
> Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced
> duplicates though the result is still the same i.e record 1 is fired both
> at the start and the end of the window. so for every window i see the first
> event of the window is coming twice in the output.
>
> I'm trying to explain again the desired behaviour, hopefully it becomes
> clear.
>
> all the records have the same key.
> current output.
>
>> record 1 : first event in the window-1 : fired
>> record 2 : last event in the window-1 : fired
>> record 3 : first event in the window-2 : fired. [this should not have
>> fired since it has the same Key as all other records.]
>> record 4, record 5 : - 2 events in the window-2 : fired.
>>
>
> expected output.
>
>> record 1 : first event in the window-1 : fired
>> record 2 : last event in the window-1 : fired
>> record 3,4,5 : all event in the window-2 : fired
>
>
> I think my problem is to store KeyBy values between windows. For example,
> I want to retain the KeyBy for 1 day. In that case, record 1 is fired
> instantly, all other records (of same key as record1) are always grouped in
> each window (say 1 min) instead of firing instantly.
>
> Thanks!
>
> On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise  wrote:
>
>> Hi Diwakar,
>>
>> the issue is that you fire_and_purge the state, you should just FIRE on
>> the first element (or else you lose the information that you received the
>> element already).
>> You'd use FIRE_AND_PURGE on the last element though.
>>
>> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
>> khachatryan.ro...@gmail.com> wrote:
>>
>>> Hi Diwakar,
>>>
>>> I'm not sure I fully understand your question.
>>> If event handling in one window depends on some other windows than
>>> TriggerContext.getPartitionedState can not be used. Triggers don't have
>>> access to the global state (only to key-window scoped state).
>>> If that's what you want then please consider ProcessWindowFunction [1]
>>> where you can use context.globalState() in your process function.
>>>
>>> [1]
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
>>> wrote:
>>>

 Hello,

 I'm trying to use a custom trigger for one of my use case. I have a
 basic logic (as shown below) of using keyBy on the input stream and using a
 window of 1 min.

 .keyBy()
 .window(TumblingEventTimeWindows.of(Time.seconds(60)))
 .trigger(new CustomTrigger())
 .aggregate(Input.getAggregationFunction(), new
 AggregationProcessingWindow());


 My custom trigger is expected to fire the first event of the keyBy
 instantly and any subsequent events should be aggregated in the window.

 .trigger(new Trigger() {
> @Override
> public TriggerResult onElement(Record record, long l, TimeWindow
> timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState firstSeen =
> triggerContext.getPartitionedState(firstSceenDescriptor);
> if(firstSeen.value() == null) {
> firstSeen.update(true);
> // fire trigger to early evaluate window and purge that event.
> return TriggerResult.FIRE_AND_PURGE;
> }
> // Continue. Do not evaluate window per element
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> // final evaluation and purge window state
> return TriggerResult.FIRE_AND_PURGE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext
> triggerContext) throws Exception {
>
> }
> })




 Currently, I see (for each window and same key) the first event of the
 window is always fired. But I want to see this happening for only the first
 window and for the subsequent window it should aggregate all the events and
 then fire.

 Example : all the records have the same key.
 current output.
 record 1 : first event in the window-1 : fired record 2 : last event in
 the window-1 : fired record 3 : first event in the window-2 : fired record
 4, record 5 : - 2 events in the window-2 : fired.

 

Re: Flink custom trigger use case

2021-02-24 Thread Diwakar Jha
Hi Arvid,

Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced duplicates
though the result is still the same i.e record 1 is fired both at the start
and the end of the window. so for every window i see the first event of the
window is coming twice in the output.

I'm trying to explain again the desired behaviour, hopefully it becomes
clear.

all the records have the same key.
current output.

> record 1 : first event in the window-1 : fired
> record 2 : last event in the window-1 : fired
> record 3 : first event in the window-2 : fired. [this should not have
> fired since it has the same Key as all other records.]
> record 4, record 5 : - 2 events in the window-2 : fired.
>

expected output.

> record 1 : first event in the window-1 : fired
> record 2 : last event in the window-1 : fired
> record 3,4,5 : all event in the window-2 : fired


I think my problem is to store KeyBy values between windows. For example, I
want to retain the KeyBy for 1 day. In that case, record 1 is fired
instantly, all other records (of same key as record1) are always grouped in
each window (say 1 min) instead of firing instantly.

Thanks!

On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise  wrote:

> Hi Diwakar,
>
> the issue is that you fire_and_purge the state, you should just FIRE on
> the first element (or else you lose the information that you received the
> element already).
> You'd use FIRE_AND_PURGE on the last element though.
>
> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
> khachatryan.ro...@gmail.com> wrote:
>
>> Hi Diwakar,
>>
>> I'm not sure I fully understand your question.
>> If event handling in one window depends on some other windows than
>> TriggerContext.getPartitionedState can not be used. Triggers don't have
>> access to the global state (only to key-window scoped state).
>> If that's what you want then please consider ProcessWindowFunction [1]
>> where you can use context.globalState() in your process function.
>>
>> [1]
>>
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>>
>> Regards,
>> Roman
>>
>>
>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
>> wrote:
>>
>>>
>>> Hello,
>>>
>>> I'm trying to use a custom trigger for one of my use case. I have a
>>> basic logic (as shown below) of using keyBy on the input stream and using a
>>> window of 1 min.
>>>
>>> .keyBy()
>>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>> .trigger(new CustomTrigger())
>>> .aggregate(Input.getAggregationFunction(), new
>>> AggregationProcessingWindow());
>>>
>>>
>>> My custom trigger is expected to fire the first event of the keyBy
>>> instantly and any subsequent events should be aggregated in the window.
>>>
>>> .trigger(new Trigger() {
 @Override
 public TriggerResult onElement(Record record, long l, TimeWindow
 timeWindow, TriggerContext triggerContext) throws Exception {
 ValueState firstSeen =
 triggerContext.getPartitionedState(firstSceenDescriptor);
 if(firstSeen.value() == null) {
 firstSeen.update(true);
 // fire trigger to early evaluate window and purge that event.
 return TriggerResult.FIRE_AND_PURGE;
 }
 // Continue. Do not evaluate window per element
 return TriggerResult.CONTINUE;
 }
 @Override
 public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
 TriggerContext triggerContext) throws Exception {
 // final evaluation and purge window state
 return TriggerResult.FIRE_AND_PURGE;
 }
 @Override
 public TriggerResult onEventTime(long l, TimeWindow timeWindow,
 TriggerContext triggerContext) throws Exception {
 return TriggerResult.CONTINUE;
 }
 @Override
 public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
 throws Exception {

 }
 })
>>>
>>>
>>>
>>>
>>> Currently, I see (for each window and same key) the first event of the
>>> window is always fired. But I want to see this happening for only the first
>>> window and for the subsequent window it should aggregate all the events and
>>> then fire.
>>>
>>> Example : all the records have the same key.
>>> current output.
>>> record 1 : first event in the window-1 : fired record 2 : last event in
>>> the window-1 : fired record 3 : first event in the window-2 : fired record
>>> 4, record 5 : - 2 events in the window-2 : fired.
>>>
>>> expected output.
>>> record 1 : first event in the window-1 : fired record 2 : last event in
>>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>>> window-2 should not fire the first event of the same key.
>>>
>>> I'm reading it here
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>>> but not able to solve it. Any pointers would be helpful.
>>>
>>> Thanks.
>>>
>>


Re: Flink custom trigger use case

2021-02-24 Thread Arvid Heise
Hi Diwakar,

the issue is that you fire_and_purge the state, you should just FIRE on the
first element (or else you lose the information that you received the
element already).
You'd use FIRE_AND_PURGE on the last element though.

On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hi Diwakar,
>
> I'm not sure I fully understand your question.
> If event handling in one window depends on some other windows than
> TriggerContext.getPartitionedState can not be used. Triggers don't have
> access to the global state (only to key-window scoped state).
> If that's what you want then please consider ProcessWindowFunction [1]
> where you can use context.globalState() in your process function.
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
> wrote:
>
>>
>> Hello,
>>
>> I'm trying to use a custom trigger for one of my use case. I have a basic
>> logic (as shown below) of using keyBy on the input stream and using a
>> window of 1 min.
>>
>> .keyBy()
>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>> .trigger(new CustomTrigger())
>> .aggregate(Input.getAggregationFunction(), new
>> AggregationProcessingWindow());
>>
>>
>> My custom trigger is expected to fire the first event of the keyBy
>> instantly and any subsequent events should be aggregated in the window.
>>
>> .trigger(new Trigger() {
>>> @Override
>>> public TriggerResult onElement(Record record, long l, TimeWindow
>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>> ValueState firstSeen =
>>> triggerContext.getPartitionedState(firstSceenDescriptor);
>>> if(firstSeen.value() == null) {
>>> firstSeen.update(true);
>>> // fire trigger to early evaluate window and purge that event.
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> // Continue. Do not evaluate window per element
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> // final evaluation and purge window state
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> @Override
>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>>> throws Exception {
>>>
>>> }
>>> })
>>
>>
>>
>>
>> Currently, I see (for each window and same key) the first event of the
>> window is always fired. But I want to see this happening for only the first
>> window and for the subsequent window it should aggregate all the events and
>> then fire.
>>
>> Example : all the records have the same key.
>> current output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3 : first event in the window-2 : fired record
>> 4, record 5 : - 2 events in the window-2 : fired.
>>
>> expected output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>> window-2 should not fire the first event of the same key.
>>
>> I'm reading it here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>> but not able to solve it. Any pointers would be helpful.
>>
>> Thanks.
>>
>


Re: Flink custom trigger use case

2021-02-23 Thread Khachatryan Roman
Hi Diwakar,

I'm not sure I fully understand your question.
If event handling in one window depends on some other windows than
TriggerContext.getPartitionedState can not be used. Triggers don't have
access to the global state (only to key-window scoped state).
If that's what you want then please consider ProcessWindowFunction [1]
where you can use context.globalState() in your process function.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction

Regards,
Roman


On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha  wrote:

>
> Hello,
>
> I'm trying to use a custom trigger for one of my use case. I have a basic
> logic (as shown below) of using keyBy on the input stream and using a
> window of 1 min.
>
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(new CustomTrigger())
> .aggregate(Input.getAggregationFunction(), new
> AggregationProcessingWindow());
>
>
> My custom trigger is expected to fire the first event of the keyBy
> instantly and any subsequent events should be aggregated in the window.
>
> .trigger(new Trigger() {
>> @Override
>> public TriggerResult onElement(Record record, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> ValueState firstSeen =
>> triggerContext.getPartitionedState(firstSceenDescriptor);
>> if(firstSeen.value() == null) {
>> firstSeen.update(true);
>> // fire trigger to early evaluate window and purge that event.
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> // Continue. Do not evaluate window per element
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> // final evaluation and purge window state
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> @Override
>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>> throws Exception {
>>
>> }
>> })
>
>
>
>
> Currently, I see (for each window and same key) the first event of the
> window is always fired. But I want to see this happening for only the first
> window and for the subsequent window it should aggregate all the events and
> then fire.
>
> Example : all the records have the same key.
> current output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3 : first event in the window-2 : fired record
> 4, record 5 : - 2 events in the window-2 : fired.
>
> expected output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
> window-2 should not fire the first event of the same key.
>
> I'm reading it here
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
> but not able to solve it. Any pointers would be helpful.
>
> Thanks.
>


Re: Flink custom trigger use case

2021-02-23 Thread Diwakar Jha
Hi Roman,

Thanks for your reply! That was a typo, i'm using
TumblingProcessingTimeWindows
My problem is that i want to stop the first event trigger (per key) except
for the first window. right now, my first event is getting triggered in
every window. Will setting  "*state (firstSeen) value is true, not just
exists" *is also going to change the result per window.

Thanks!

On Tue, Feb 23, 2021 at 12:05 PM Roman Khachatryan  wrote:

> Hi,
>
> I've noticed that you are using an event time window, but the trigger
> fires based on processing time.
> You should also register an event time timer (for the window end). So that
> trigger.onEventTime() will be called.
> And it's safer to check if the state (firstSeen) value is true, not just
> exists.
>
> Regards,
> Roman
>
>
> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha 
> wrote:
>
>>
>> Hello,
>>
>> I'm trying to use a custom trigger for one of my use case. I have a basic
>> logic (as shown below) of using keyBy on the input stream and using a
>> window of 1 min.
>>
>> .keyBy()
>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>> .trigger(new CustomTrigger())
>> .aggregate(Input.getAggregationFunction(), new
>> AggregationProcessingWindow());
>>
>>
>> My custom trigger is expected to fire the first event of the keyBy
>> instantly and any subsequent events should be aggregated in the window.
>>
>> .trigger(new Trigger() {
>>> @Override
>>> public TriggerResult onElement(Record record, long l, TimeWindow
>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>> ValueState firstSeen =
>>> triggerContext.getPartitionedState(firstSceenDescriptor);
>>> if(firstSeen.value() == null) {
>>> firstSeen.update(true);
>>> // fire trigger to early evaluate window and purge that event.
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> // Continue. Do not evaluate window per element
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> // final evaluation and purge window state
>>> return TriggerResult.FIRE_AND_PURGE;
>>> }
>>> @Override
>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>>> TriggerContext triggerContext) throws Exception {
>>> return TriggerResult.CONTINUE;
>>> }
>>> @Override
>>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>>> throws Exception {
>>>
>>> }
>>> })
>>
>>
>>
>>
>> Currently, I see (for each window and same key) the first event of the
>> window is always fired. But I want to see this happening for only the first
>> window and for the subsequent window it should aggregate all the events and
>> then fire.
>>
>> Example : all the records have the same key.
>> current output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3 : first event in the window-2 : fired record
>> 4, record 5 : - 2 events in the window-2 : fired.
>>
>> expected output.
>> record 1 : first event in the window-1 : fired record 2 : last event in
>> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>> window-2 should not fire the first event of the same key.
>>
>> I'm reading it here
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>> but not able to solve it. Any pointers would be helpful.
>>
>> Thanks.
>>
>


Re: Flink custom trigger use case

2021-02-23 Thread Roman Khachatryan
Hi,

I've noticed that you are using an event time window, but the trigger fires
based on processing time.
You should also register an event time timer (for the window end). So that
trigger.onEventTime() will be called.
And it's safer to check if the state (firstSeen) value is true, not just
exists.

Regards,
Roman


On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha  wrote:

>
> Hello,
>
> I'm trying to use a custom trigger for one of my use case. I have a basic
> logic (as shown below) of using keyBy on the input stream and using a
> window of 1 min.
>
> .keyBy()
> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> .trigger(new CustomTrigger())
> .aggregate(Input.getAggregationFunction(), new
> AggregationProcessingWindow());
>
>
> My custom trigger is expected to fire the first event of the keyBy
> instantly and any subsequent events should be aggregated in the window.
>
> .trigger(new Trigger() {
>> @Override
>> public TriggerResult onElement(Record record, long l, TimeWindow
>> timeWindow, TriggerContext triggerContext) throws Exception {
>> ValueState firstSeen =
>> triggerContext.getPartitionedState(firstSceenDescriptor);
>> if(firstSeen.value() == null) {
>> firstSeen.update(true);
>> // fire trigger to early evaluate window and purge that event.
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> // Continue. Do not evaluate window per element
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> // final evaluation and purge window state
>> return TriggerResult.FIRE_AND_PURGE;
>> }
>> @Override
>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>> TriggerContext triggerContext) throws Exception {
>> return TriggerResult.CONTINUE;
>> }
>> @Override
>> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
>> throws Exception {
>>
>> }
>> })
>
>
>
>
> Currently, I see (for each window and same key) the first event of the
> window is always fired. But I want to see this happening for only the first
> window and for the subsequent window it should aggregate all the events and
> then fire.
>
> Example : all the records have the same key.
> current output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3 : first event in the window-2 : fired record
> 4, record 5 : - 2 events in the window-2 : fired.
>
> expected output.
> record 1 : first event in the window-1 : fired record 2 : last event in
> the window-1 : fired record 3,4,5 : all event in the window-2 : fired
> window-2 should not fire the first event of the same key.
>
> I'm reading it here
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
> but not able to solve it. Any pointers would be helpful.
>
> Thanks.
>


Fwd: Flink custom trigger use case

2021-02-23 Thread Diwakar Jha
Hello,

posting again for help. I'm planning to use state TTL but would like to
know if there is any other way to do it. I'm using Flink 1.11.
Thanks!

-- Forwarded message -
From: Diwakar Jha 
Date: Mon, Feb 22, 2021 at 6:28 PM
Subject: Flink custom trigger use case
To: user 



Hello,

I'm trying to use a custom trigger for one of my use case. I have a basic
logic (as shown below) of using keyBy on the input stream and using a
window of 1 min.

.keyBy()
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(new CustomTrigger())
.aggregate(Input.getAggregationFunction(), new
AggregationProcessingWindow());


My custom trigger is expected to fire the first event of the keyBy
instantly and any subsequent events should be aggregated in the window.

.trigger(new Trigger() {
> @Override
> public TriggerResult onElement(Record record, long l, TimeWindow
> timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState firstSeen =
> triggerContext.getPartitionedState(firstSceenDescriptor);
> if(firstSeen.value() == null) {
> firstSeen.update(true);
> // fire trigger to early evaluate window and purge that event.
> return TriggerResult.FIRE_AND_PURGE;
> }
> // Continue. Do not evaluate window per element
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> // final evaluation and purge window state
> return TriggerResult.FIRE_AND_PURGE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
> throws Exception {
>
> }
> })




Currently, I see (for each window and same key) the first event of the
window is always fired. But I want to see this happening for only the first
window and for the subsequent window it should aggregate all the events and
then fire.

Example : all the records have the same key.
current output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3 : first event in the window-2 : fired record 4,
record 5 : - 2 events in the window-2 : fired.

expected output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3,4,5 : all event in the window-2 : fired
window-2 should not fire the first event of the same key.

I'm reading it here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
but not able to solve it. Any pointers would be helpful.

Thanks.


Flink custom trigger use case

2021-02-22 Thread Diwakar Jha
Hello,

I'm trying to use a custom trigger for one of my use case. I have a basic
logic (as shown below) of using keyBy on the input stream and using a
window of 1 min.

.keyBy()
.window(TumblingEventTimeWindows.of(Time.seconds(60)))
.trigger(new CustomTrigger())
.aggregate(Input.getAggregationFunction(), new
AggregationProcessingWindow());


My custom trigger is expected to fire the first event of the keyBy
instantly and any subsequent events should be aggregated in the window.

.trigger(new Trigger() {
> @Override
> public TriggerResult onElement(Record record, long l, TimeWindow
> timeWindow, TriggerContext triggerContext) throws Exception {
> ValueState firstSeen =
> triggerContext.getPartitionedState(firstSceenDescriptor);
> if(firstSeen.value() == null) {
> firstSeen.update(true);
> // fire trigger to early evaluate window and purge that event.
> return TriggerResult.FIRE_AND_PURGE;
> }
> // Continue. Do not evaluate window per element
> return TriggerResult.CONTINUE;
> }
> @Override
> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> // final evaluation and purge window state
> return TriggerResult.FIRE_AND_PURGE;
> }
> @Override
> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
> TriggerContext triggerContext) throws Exception {
> return TriggerResult.CONTINUE;
> }
> @Override
> public void clear(TimeWindow timeWindow, TriggerContext triggerContext)
> throws Exception {
>
> }
> })




Currently, I see (for each window and same key) the first event of the
window is always fired. But I want to see this happening for only the first
window and for the subsequent window it should aggregate all the events and
then fire.

Example : all the records have the same key.
current output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3 : first event in the window-2 : fired record 4,
record 5 : - 2 events in the window-2 : fired.

expected output.
record 1 : first event in the window-1 : fired record 2 : last event in the
window-1 : fired record 3,4,5 : all event in the window-2 : fired
window-2 should not fire the first event of the same key.

I'm reading it here
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
but not able to solve it. Any pointers would be helpful.

Thanks.