Thanks for correcting me Piotr. I didn't look close enough at the code. With the presently implemented logic, a record should not be emitted to a side output if its window wasn't closed yet.
2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>: > Generally speaking best practise is always to simplify your program as > much as possible to narrow down the scope of the search. Replace data > source with statically generated events, remove unnecessary components Etc. > Either such process help you figure out what’s wrong on your own and if > not, if you share us such minimal program that reproduces the issue, it > will allow us to debug it. > > Piotrek > > > On 11 May 2018, at 13:54, Juho Autio <juho.au...@rovio.com> wrote: > > Thanks for that code snippet, I should try it out to simulate my DAG.. If > any suggestions how to debug futher what's causing late data on a > production stream job, please let me know. > > On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com> > wrote: > >> Hey, >> >> Actually I think Fabian initial message was incorrect. As far as I can >> see in the code of WindowOperator (last lines of org.apache.flink.streaming. >> runtime.operators.windowing.WindowOperator#processElement ), the element >> is sent to late side output if it is late AND it wasn’t assigned to any of >> the existing windows (because they were late as well). In other words, it >> should work as you Juho are wishing: element should be marked as late once >> they are overdue/late for the window after one full day. >> >> I have tested it and it works as expected. Following program: >> >> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a >> >> Prints only ONE number to the standard err: >> >> > 1394 >> >> And there is nothing on the side output. >> >> Piotrek >> >> On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote: >> >> Thanks. What I still don't get is why my message got filtered in the >> first place. Even if the allowed lateness filtering would be done "on the >> window", data should not be dropped as late if it's not in fact late by >> more than the allowedLateness setting. >> >> Assuming that these conditions hold: >> - messages (and thus the extracted timestamps) were not out of order by >> more than 5 secods (as far as I didn't make any mistake in my >> partition-level analysis) >> - allowedLateness=1 minute >> - watermarks are assigned on kafka consumer meaning that they are >> synchronized across all partitions >> >> I don't see how the watermark could have ever been more than 5 seconds >> further when the message arrives on the isElementLate filter. Do you have >> any idea on this? Is there some existing test that simulates out of order >> input to flink's kafka consumer? I could try to build a test case based on >> that to possibly reproduce my problem. I'm not sure how to gather enough >> debug information on the production stream so that it would clearly show >> the watermarks, how they progressed on each kafka partition & later in the >> chain in case isElementLate filters something. >> >> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com> >> wrote: >> >>> Hi Juho, >>> >>> Thanks for bringing up this topic! I share your intuition. >>> IMO, records should only be filtered out and send to a side output if >>> any of the windows they would be assigned to is closed already. >>> >>> I had a look into the code and found that records are filtered out as >>> late based on the following condition: >>> >>> protected boolean isElementLate(StreamRecord<IN> element){ >>> return (windowAssigner.isEventTime()) && >>> (element.getTimestamp() + allowedLateness <= >>> internalTimerService.currentWatermark()); >>> } >>> >>> >>> This code shows that your analysis is correct. >>> Records are filtered out based on their timestamp and the current >>> watermark, even though they arrive before the window is closed. >>> >>> OTOH, filtering out records based on the window they would end up in can >>> also be tricky if records are assigned to multiple windows (e.g., sliding >>> windows). >>> In this case, a side-outputted records could still be in some windows >>> and not in others. >>> >>> @Aljoscha (CC) Might have an explanation for the current behavior. >>> >>> Thanks, >>> Fabian >>> >>> >>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>: >>> >>>> I don't understand why I'm getting some data discarded as late on my >>>> Flink stream job a long time before the window even closes. >>>> >>>> I can not be 100% sure, but to me it seems like the kafka consumer is >>>> basically causing the data to be dropped as "late", not the window. I >>>> didn't expect this to ever happen? >>>> >>>> I have a Flink stream job that gathers distinct values using a 24-hour >>>> window. It reads the data from Kafka, using a >>>> BoundedOutOfOrdernessTimestampExtractor >>>> on the kafka consumer to synchronize watermarks accross all kafka >>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds. >>>> >>>> I have also enabled allowedLateness with 1 minute lateness on the >>>> 24-hour window: >>>> >>>> .timeWindow(Time.days(1)) >>>> .allowedLateness(Time.minutes(1)) >>>> .sideOutputLateData(lateDataTag) >>>> .reduce(new DistinctFunction()) >>>> >>>> I have used accumulators to see that there is some late data. I have >>>> had multiple occurrences of those. >>>> >>>> Now focusing on a particular case that I was investigating more >>>> closely. Around ~12:15 o-clock my late data accumulator started showing >>>> that 1 message had been late. That's in the middle of the time window – so >>>> why would this happen? I would expect late data to be discarded only >>>> sometime after 00:01 if some data is arriving late for the window that just >>>> closed at 00:00, and doesn't get emitted as part of 1 minute >>>> allowedLateness. >>>> >>>> To analyze the timestamps I read all messages in sequence separately >>>> from each kafka partition and calculated the difference in timestamps >>>> between consecutive messages. I had had exactly one message categorized as >>>> late by Flink in this case, and at the time i was using maxOutOfOrderness >>>> = 5 seconds. I found exactly one message in one kafka partition where the >>>> timestamp difference between messages was 5 seconds (they were out of order >>>> by 5 s), which makes me wonder, did Flink drop the event as late because it >>>> violated maxOutOfOrderness? Have I misunderstood the concept of late >>>> data somehow? I only expected late data to happen on window operations. I >>>> would expect kafka consumer to pass "late" messages onward even though >>>> watermark doesn't change. >>>> >>>> Thank you very much if you can find the time to look at this! >>>> >>> >>> >> >> > > >