Hi Till,
When I run the example code that you posted, the order of the three
messages (window started, contents of window and window ended) is
non-deterministic. This is surprising to me, as setParallelism(1) has been
used in the pipeline - I assumed this should eliminate any form of race
conditions for printing. What's more is that if I *remove*
setParallelism(1) from the code, the output is deterministic and correct
(i.e. windowStarted -> windowContents -> windowEnded).

Clearly, something is wrong with my understanding. What is it?

On Fri, Feb 28, 2020 at 1:58 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Great to hear that you solved the problem. Let us know if you run into any
> other issues.
>
> Cheers,
> Till
>
> On Fri, Feb 28, 2020 at 8:08 AM Manas Kale <manaskal...@gmail.com> wrote:
>
>> Hi,
>> This problem is solved[1]. The issue was that the BroadcastStream did not
>> contain any watermark, which prevented watermarks for any downstream
>> operators from advancing.
>> I appreciate all the help.
>> [1]
>> https://stackoverflow.com/questions/60430520/how-do-i-fire-downstream-oneventtime-method-when-using-broadcaststate-pattern
>>
>> Thanks,
>> Manas
>>
>> On Thu, Feb 27, 2020 at 4:28 PM Manas Kale <manaskal...@gmail.com> wrote:
>>
>>> Hi Rafi and Till,
>>> Thank you for pointing out that edge case, Rafi.
>>>
>>> Till, I am trying to get this example working with the BroadcastState
>>> pattern upstream to the window operator[1]. The problem is that introducing
>>> the BroadcastState makes the onEventTime() *never* fire. Is the
>>> BroadcastState somehow eating up the watermark? Do I need to generate the
>>> watermark again in the KeyedBroadcastProcessFunction?
>>>
>>> [1] https://gist.github.com/manasIU/1777c9c99e195a409441815559094b49
>>>
>>> Thanks,
>>> Manas
>>>
>>> On Fri, Feb 21, 2020 at 8:55 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Hi Manas and Rafi,
>>>>
>>>> you are right that when using merging windows as event time session
>>>> windows are, then Flink requires that any state the Trigger keeps is of
>>>> type MergingState. This constraint allows that the state can be merged
>>>> whenever two windows get merged.
>>>>
>>>> Rafi, you are right. With the current implementation it might happen
>>>> that you send a wrong started window message. I think it depends on the
>>>> MIN_WINDOW_SIZE and the distribution of your timestamps and, hence, also
>>>> your watermark. If you want to be on the safe side, then I would recommend
>>>> to use the ProcessFunction to implement the required logic. The
>>>> ProcessFunction [1] is Flink's low level API and gives you access to state
>>>> and timers. In it, you would need to buffer the elements and to sessionize
>>>> them yourself, though. However, it would give you access to the
>>>> watermark which in turn would allow you to properly handle your described
>>>> edge case.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Thu, Feb 20, 2020 at 12:25 PM Rafi Aroch <rafi.ar...@gmail.com>
>>>> wrote:
>>>>
>>>>> I think one "edge" case which is not handled would be that the first
>>>>> event (by event-time) arrives late, then a wrong "started-window" would be
>>>>> reported.
>>>>>
>>>>> Rafi
>>>>>
>>>>>
>>>>> On Thu, Feb 20, 2020 at 12:36 PM Manas Kale <manaskal...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Is the reason ValueState cannot be use because session windows are
>>>>>> always formed by merging proto-windows of single elements, therefore a
>>>>>> state store is needed that can handle merging. ValueState does not 
>>>>>> provide
>>>>>> this functionality, but a ReducingState does?
>>>>>>
>>>>>> On Thu, Feb 20, 2020 at 4:01 PM Manas Kale <manaskal...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>> Thanks for your answer! You also answered the next question that I
>>>>>>> was about to ask "Can we share state between a Trigger and a Window?"
>>>>>>> Currently the only (convoluted) way to share state between two 
>>>>>>> operators is
>>>>>>> through the broadcast state pattern, right?
>>>>>>> Also, in your example, why can't we use a
>>>>>>> ValueStateDescriptor<Boolean> in the Trigger? I tried using it in my own
>>>>>>> example but it  I am not able to  call the mergePartitionedState() 
>>>>>>> method
>>>>>>> on a ValueStateDescriptor.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Manas
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 18, 2020 at 7:20 PM Till Rohrmann <trohrm...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Manas,
>>>>>>>>
>>>>>>>> you can implement something like this with a bit of trigger magic.
>>>>>>>> What you need to do is to define your own trigger implementation which
>>>>>>>> keeps state to remember whether it has triggered the "started window"
>>>>>>>> message or not. In the stateful window function you would need to do
>>>>>>>> something similar. The first call could trigger the output of "window
>>>>>>>> started" and any subsequent call will trigger the evaluation of the 
>>>>>>>> window.
>>>>>>>> It would have been a bit easier if the trigger and the window process
>>>>>>>> function could share its internal state. Unfortunately, this is not
>>>>>>>> possible at the moment.
>>>>>>>>
>>>>>>>> I've drafted a potential solution which you can find here [1].
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://gist.github.com/tillrohrmann/5251f6d62e256b60947eea7b553519ef
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Mon, Feb 17, 2020 at 8:09 AM Manas Kale <manaskal...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I want to achieve the following using event time session windows:
>>>>>>>>>
>>>>>>>>>    1. When the window.getStart() and last event timestamp in the
>>>>>>>>>    window is greater than MIN_WINDOW_SIZE milliseconds, I want to 
>>>>>>>>> emit a
>>>>>>>>>    message "Window started @ timestamp".
>>>>>>>>>    2. When the session window ends, i.e. the watermark passes
>>>>>>>>>    lasteventTimestamp + inactivityPeriod, I want to emit a message 
>>>>>>>>> "Window
>>>>>>>>>    ended @ timestamp".
>>>>>>>>>
>>>>>>>>>  It is guaranteed that all events are on time and no lateness is
>>>>>>>>> allowed. I am having difficulty implementing both 1 and 2 
>>>>>>>>> simultaneously.
>>>>>>>>> I am able to implement point 1 using a custom trigger, which
>>>>>>>>> checks if  (lastEventTimestamp - window.getStart()) > MIN_WINDOW_SIZE 
>>>>>>>>> and
>>>>>>>>> triggers a customProcessWindowFunction().
>>>>>>>>> However, with this architecture I can't detect the end of the
>>>>>>>>> window.
>>>>>>>>>
>>>>>>>>> Is my approach correct or is there a completely different method
>>>>>>>>> to achieve this?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Manas Kale
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Reply via email to