Hi,

Yes, you have an Iterable with window elements as the ProcessWindowFunction
input. You can then emit them individually.

Regards,
Roman


On Thu, Feb 25, 2021 at 7:22 AM Diwakar Jha <diwakar.n...@gmail.com> wrote:

> Hello,
>
> I tried using *processWindowFunction* since it gives access to
> *globalstate* through *context*. My question is, Is it possible to
> discard single events inside *process* function of *processWindowFunction*
> just like *onElements* of triggers?
> For my use case it seems that trigger is not sufficient but i want to know
> how i can do it using processWindowFunction. Appreciate any pointers.
>
> Thanks!
>
> On Wed, Feb 24, 2021 at 10:50 AM Diwakar Jha <diwakar.n...@gmail.com>
> wrote:
>
>> Hi Arvid,
>>
>> Thanks. I tried FIRE instead of FIRE_AND_PURGE and it introduced
>> duplicates though the result is still the same i.e record 1 is fired both
>> at the start and the end of the window. so for every window i see the first
>> event of the window is coming twice in the output.
>>
>> I'm trying to explain again the desired behaviour, hopefully it becomes
>> clear.
>>
>> all the records have the same key.
>> current output.
>>
>>> record 1 : first event in the window-1 : fired
>>> record 2 : last event in the window-1 : fired
>>> record 3 : first event in the window-2 : fired. [this should not have
>>> fired since it has the same Key as all other records.]
>>> record 4, record 5 : - 2 events in the window-2 : fired.
>>>
>>
>> expected output.
>>
>>> record 1 : first event in the window-1 : fired
>>> record 2 : last event in the window-1 : fired
>>> record 3,4,5 : all event in the window-2 : fired
>>
>>
>> I think my problem is to store KeyBy values between windows. For example,
>> I want to retain the KeyBy for 1 day. In that case, record 1 is fired
>> instantly, all other records (of same key as record1) are always grouped in
>> each window (say 1 min) instead of firing instantly.
>>
>> Thanks!
>>
>> On Wed, Feb 24, 2021 at 6:19 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Diwakar,
>>>
>>> the issue is that you fire_and_purge the state, you should just FIRE on
>>> the first element (or else you lose the information that you received the
>>> element already).
>>> You'd use FIRE_AND_PURGE on the last element though.
>>>
>>> On Wed, Feb 24, 2021 at 7:16 AM Khachatryan Roman <
>>> khachatryan.ro...@gmail.com> wrote:
>>>
>>>> Hi Diwakar,
>>>>
>>>> I'm not sure I fully understand your question.
>>>> If event handling in one window depends on some other windows than
>>>> TriggerContext.getPartitionedState can not be used. Triggers don't have
>>>> access to the global state (only to key-window scoped state).
>>>> If that's what you want then please consider ProcessWindowFunction [1]
>>>> where you can use context.globalState() in your process function.
>>>>
>>>> [1]
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>>>>
>>>> Regards,
>>>> Roman
>>>>
>>>>
>>>> On Tue, Feb 23, 2021 at 3:29 AM Diwakar Jha <diwakar.n...@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Hello,
>>>>>
>>>>> I'm trying to use a custom trigger for one of my use case. I have a
>>>>> basic logic (as shown below) of using keyBy on the input stream and using 
>>>>> a
>>>>> window of 1 min.
>>>>>
>>>>> .keyBy(<key selector>)
>>>>> .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>>>>> .trigger(new CustomTrigger())
>>>>> .aggregate(Input.getAggregationFunction(), new
>>>>> AggregationProcessingWindow());
>>>>>
>>>>>
>>>>> My custom trigger is expected to fire the first event of the keyBy
>>>>> instantly and any subsequent events should be aggregated in the window.
>>>>>
>>>>> .trigger(new Trigger<Record, TimeWindow>() {
>>>>>> @Override
>>>>>> public TriggerResult onElement(Record record, long l, TimeWindow
>>>>>> timeWindow, TriggerContext triggerContext) throws Exception {
>>>>>> ValueState<Boolean> firstSeen =
>>>>>> triggerContext.getPartitionedState(firstSceenDescriptor);
>>>>>> if(firstSeen.value() == null) {
>>>>>> firstSeen.update(true);
>>>>>> // fire trigger to early evaluate window and purge that event.
>>>>>> return TriggerResult.FIRE_AND_PURGE;
>>>>>> }
>>>>>> // Continue. Do not evaluate window per element
>>>>>> return TriggerResult.CONTINUE;
>>>>>> }
>>>>>> @Override
>>>>>> public TriggerResult onProcessingTime(long l, TimeWindow timeWindow,
>>>>>> TriggerContext triggerContext) throws Exception {
>>>>>> // final evaluation and purge window state
>>>>>> return TriggerResult.FIRE_AND_PURGE;
>>>>>> }
>>>>>> @Override
>>>>>> public TriggerResult onEventTime(long l, TimeWindow timeWindow,
>>>>>> TriggerContext triggerContext) throws Exception {
>>>>>> return TriggerResult.CONTINUE;
>>>>>> }
>>>>>> @Override
>>>>>> public void clear(TimeWindow timeWindow, TriggerContext
>>>>>> triggerContext) throws Exception {
>>>>>>
>>>>>> }
>>>>>> })
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Currently, I see (for each window and same key) the first event of the
>>>>> window is always fired. But I want to see this happening for only the 
>>>>> first
>>>>> window and for the subsequent window it should aggregate all the events 
>>>>> and
>>>>> then fire.
>>>>>
>>>>> Example : all the records have the same key.
>>>>> current output.
>>>>> record 1 : first event in the window-1 : fired record 2 : last event
>>>>> in the window-1 : fired record 3 : first event in the window-2 : fired
>>>>> record 4, record 5 : - 2 events in the window-2 : fired.
>>>>>
>>>>> expected output.
>>>>> record 1 : first event in the window-1 : fired record 2 : last event
>>>>> in the window-1 : fired record 3,4,5 : all event in the window-2 : fired
>>>>> window-2 should not fire the first event of the same key.
>>>>>
>>>>> I'm reading it here
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#fire-and-purge
>>>>> but not able to solve it. Any pointers would be helpful.
>>>>>
>>>>> Thanks.
>>>>>
>>>>

Reply via email to