Thanks On Wed, Oct 7, 2020 at 7:06 PM Till Rohrmann <trohrm...@apache.org> wrote:
> Hi Ori, > > you are right. Events are being sent down the side output for late events > if the event's timestamp + the allowed lateness is smaller than the current > watermark. These events are directly seen by downstream operators which > consume the side output for late events. > > Cheers, > Till > > On Wed, Oct 7, 2020 at 2:32 PM Ori Popowski <ori....@gmail.com> wrote: > >> After creating a toy example I think that I've got the concept of >> lateDataOutput wrong. >> >> It seems that the lateDataSideOutput has nothing to do with windowing; >> when events arrive late they'll just go straight to the side output, and >> there can never be any window firing of the main flow for that specific key. >> >> On Wed, Oct 7, 2020 at 2:42 PM Ori Popowski <ori....@gmail.com> wrote: >> >>> I've made an experiment where I use an evictor on the main window (not >>> the late one), only to write a debug file when the window fires (I don't >>> actually evict events, I've made it so I can write a debug object the >>> moment the window finishes). >>> >>> I can see that indeed the late data window fires before the main window, >>> since the mentioned debug file does not exist, but late events _do_ exist >>> in the destination. >>> >>> Writing this debug object in the evictor eliminates potential problems >>> that might be due to logic in the process function, and it proves that the >>> window of the late events indeed fires before the main window. >>> >>> Here's an outline of my job: >>> >>> val windowedStream = senv >>> .addSource(kafkaSource) >>> ... // some operators >>> // like BoundedOutOfOrdereness but ignore future timestamps >>> .assignTimestampsAndWatermarks(new IgnoreFutureTimestamps(10.minutes)) >>> ... // some more operators >>> .keyingBy { case (meta, _) => meta.toPath } >>> .window(EventTimeSessionWindows.withGap(Time.minutes(30))) // "main" >>> window >>> .sideOutputLateData(lateDataTag) >>> .process(new ProcessSession(sessionPlayback, config)) >>> windowedStream >>> .map(new SerializeSession(sessionPlayback)) >>> .addSink(sink) >>> windowedStream >>> .getSideOutput(lateDataTag) >>> .keyingBy { case (meta, _) => meta.toPath } >>> .window(TumblingProcessingTimeWindows.of(Time.minutes(30))) // "late" >>> window >>> .process(new ProcessSession(sessionPlayback, config, true)) >>> .map(new SerializeSession(sessionPlayback, late = true)) >>> >>> So, to repeat the question, is that normal? And if not - how can I fix >>> this? >>> >>> Thanks >>> >>> On Tue, Oct 6, 2020 at 3:44 PM Ori Popowski <ori....@gmail.com> wrote: >>> >>>> >>>> I have a job with event-time session window of 30 minutes. >>>> >>>> I output late events to side output, where I have a tumbling processing >>>> time window of 30 minutes. >>>> >>>> I observe that the late events are written to storage before the "main" >>>> events. >>>> >>>> I wanted to know if it's normal before digging into the code and >>>> debugging the problem. >>>> >>>> Thanks >>>> >>>