Thanks

On Wed, Oct 7, 2020 at 7:06 PM Till Rohrmann <trohrm...@apache.org> wrote:

> Hi Ori,
>
> you are right. Events are being sent down the side output for late events
> if the event's timestamp + the allowed lateness is smaller than the current
> watermark. These events are directly seen by downstream operators which
> consume the side output for late events.
>
> Cheers,
> Till
>
> On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski <ori....@gmail.com> wrote:
>
>> After creating a toy example I think that I've got the concept of
>> lateDataOutput wrong.
>>
>> It seems that the lateDataSideOutput has nothing to do with windowing;
>> when events arrive late they'll just go straight to the side output, and
>> there can never be any window firing of the main flow for that specific key.
>>
>> On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <ori....@gmail.com> wrote:
>>
>>> I've made an experiment where I use an evictor on the main window (not
>>> the late one), only to write a debug file when the window fires (I don't
>>> actually evict events, I've made it so I can write a debug object the
>>> moment the window finishes).
>>>
>>> I can see that indeed the late data window fires before the main window,
>>> since the mentioned debug file does not exist, but late events _do_ exist
>>> in the destination.
>>>
>>> Writing this debug object in the evictor eliminates potential problems
>>> that might be due to logic in the process function, and it proves that the
>>> window of the late events indeed fires before the main window.
>>>
>>> Here's an outline of my job:
>>>
>>> val windowedStream = senv
>>>   .addSource(kafkaSource)
>>>   ... // some operators
>>>   // like BoundedOutOfOrdereness but ignore future timestamps
>>>   .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes))
>>>   ... // some more operators
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main"
>>> window
>>>   .sideOutputLateData(lateDataTag)
>>>   .process(new ProcessSession(sessionPlayback, config))
>>> windowedStream
>>>   .map(new SerializeSession(sessionPlayback))
>>>   .addSink(sink)
>>> windowedStream
>>>   .getSideOutput(lateDataTag)
>>>   .keyingBy { case (meta, _) => meta.toPath }
>>>   .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late"
>>> window
>>>   .process(new ProcessSession(sessionPlayback, config, true))
>>>   .map(new SerializeSession(sessionPlayback, late = true))
>>>
>>> So, to repeat the question, is that normal? And if not - how can I fix
>>> this?
>>>
>>> Thanks
>>>
>>> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <ori....@gmail.com> wrote:
>>>
>>>>
>>>> I have a job with event-time session window of 30 minutes.
>>>>
>>>> I output late events to side output, where I have a tumbling processing
>>>> time window of 30 minutes.
>>>>
>>>> I observe that the late events are written to storage before the "main"
>>>> events.
>>>>
>>>> I wanted to know if it's normal before digging into the code and
>>>> debugging the problem.
>>>>
>>>> Thanks
>>>>
>>>

Reply via email to