Hi, Yes, you have an Iterable with window elements as the ProcessWindowFunction input. You can then emit them individually.
Regards, Roman On Thu, Feb 25, 2021 at 7:22 AM Diwakar Jha <diwakar.n...@gmail.com> wrote: > Hello, > > I tried using *processWindowFunction* since it gives access to > *globalstate* through *context*. My question is, Is it possible to > discard single events inside *process* function of *processWindowFunction* > just like *onElements* of triggers? > For my use case it seems that trigger is not sufficient but i want to know > how i can do it using processWindowFunction. Appreciate any pointers. > > Thanks! > > On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha <diwakar.n...@gmail.com> > wrote: > >> Hi Arvid, >> >> Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced >> duplicates though the result is still the same i.e record 1 is fired both >> at the start and the end of the window. so for every window i see the first >> event of the window is coming twice in the output. >> >> I'm trying to explain again the desired behaviour, hopefully it becomes >> clear. >> >> all the records have the same key. >> current output. >> >>> record 1 : first event in the window-1 : fired >>> record 2 : last event in the window-1 : fired >>> record 3 : first event in the window-2 : fired. [this should not have >>> fired since it has the same Key as all other records.] >>> record 4, record 5 : - 2 events in the window-2 : fired. >>> >> >> expected output. >> >>> record 1 : first event in the window-1 : fired >>> record 2 : last event in the window-1 : fired >>> record 3,4,5 : all event in the window-2 : fired >> >> >> I think my problem is to store KeyBy values between windows. For example, >> I want to retain the KeyBy for 1 day. In that case, record 1 is fired >> instantly, all other records (of same key as record1) are always grouped in >> each window (say 1 min) instead of firing instantly. >> >> Thanks! >> >> On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi Diwakar, >>> >>> the issue is that you fire_and_purge the state, you should just FIRE on >>> the first element (or else you lose the information that you received the >>> element already). >>> You'd use FIRE_AND_PURGE on the last element though. >>> >>> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman < >>> khachatryan.ro...@gmail.com> wrote: >>> >>>> Hi Diwakar, >>>> >>>> I'm not sure I fully understand your question. >>>> If event handling in one window depends on some other windows than >>>> TriggerContext.getPartitionedState can not be used. Triggers don't have >>>> access to the global state (only to key-window scoped state). >>>> If that's what you want then please consider ProcessWindowFunction [1] >>>> where you can use context.globalState() in your process function. >>>> >>>> [1] >>>> >>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction >>>> >>>> Regards, >>>> Roman >>>> >>>> >>>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <diwakar.n...@gmail.com> >>>> wrote: >>>> >>>>> >>>>> Hello, >>>>> >>>>> I'm trying to use a custom trigger for one of my use case. I have a >>>>> basic logic (as shown below) of using keyBy on the input stream and using >>>>> a >>>>> window of 1 min. >>>>> >>>>> .keyBy(<key selector>) >>>>> .window(TumblingEventTimeWindows.of(Time.seconds(60))) >>>>> .trigger(new CustomTrigger()) >>>>> .aggregate(Input.getAggregationFunction(), new >>>>> AggregationProcessingWindow()); >>>>> >>>>> >>>>> My custom trigger is expected to fire the first event of the keyBy >>>>> instantly and any subsequent events should be aggregated in the window. >>>>> >>>>> .trigger(new Trigger<Record, TimeWindow>() { >>>>>> @Override >>>>>> public TriggerResult onElement(Record record, long l, TimeWindow >>>>>> timeWindow, TriggerContext triggerContext) throws Exception { >>>>>> ValueState<Boolean> firstSeen = >>>>>> triggerContext.getPartitionedState(firstSceenDescriptor); >>>>>> if(firstSeen.value() == null) { >>>>>> firstSeen.update(true); >>>>>> // fire trigger to early evaluate window and purge that event. >>>>>> return TriggerResult.FIRE_AND_PURGE; >>>>>> } >>>>>> // Continue. Do not evaluate window per element >>>>>> return TriggerResult.CONTINUE; >>>>>> } >>>>>> @Override >>>>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, >>>>>> TriggerContext triggerContext) throws Exception { >>>>>> // final evaluation and purge window state >>>>>> return TriggerResult.FIRE_AND_PURGE; >>>>>> } >>>>>> @Override >>>>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow, >>>>>> TriggerContext triggerContext) throws Exception { >>>>>> return TriggerResult.CONTINUE; >>>>>> } >>>>>> @Override >>>>>> public void clear(TimeWindow timeWindow, TriggerContext >>>>>> triggerContext) throws Exception { >>>>>> >>>>>> } >>>>>> }) >>>>> >>>>> >>>>> >>>>> >>>>> Currently, I see (for each window and same key) the first event of the >>>>> window is always fired. But I want to see this happening for only the >>>>> first >>>>> window and for the subsequent window it should aggregate all the events >>>>> and >>>>> then fire. >>>>> >>>>> Example : all the records have the same key. >>>>> current output. >>>>> record 1 : first event in the window-1 : fired record 2 : last event >>>>> in the window-1 : fired record 3 : first event in the window-2 : fired >>>>> record 4, record 5 : - 2 events in the window-2 : fired. >>>>> >>>>> expected output. >>>>> record 1 : first event in the window-1 : fired record 2 : last event >>>>> in the window-1 : fired record 3,4,5 : all event in the window-2 : fired >>>>> window-2 should not fire the first event of the same key. >>>>> >>>>> I'm reading it here >>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge >>>>> but not able to solve it. Any pointers would be helpful. >>>>> >>>>> Thanks. >>>>> >>>>