Thanks for that code snippet, I should try it out to simulate my DAG.. If any suggestions how to debug futher what's causing late data on a production stream job, please let me know.
On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com> wrote: > Hey, > > Actually I think Fabian initial message was incorrect. As far as I can see > in the code of WindowOperator (last lines of org.apache.flink.streaming. > runtime.operators.windowing.WindowOperator#processElement ), the element > is sent to late side output if it is late AND it wasn’t assigned to any of > the existing windows (because they were late as well). In other words, it > should work as you Juho are wishing: element should be marked as late once > they are overdue/late for the window after one full day. > > I have tested it and it works as expected. Following program: > > https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a > > Prints only ONE number to the standard err: > > > 1394 > > And there is nothing on the side output. > > Piotrek > > On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote: > > Thanks. What I still don't get is why my message got filtered in the first > place. Even if the allowed lateness filtering would be done "on the > window", data should not be dropped as late if it's not in fact late by > more than the allowedLateness setting. > > Assuming that these conditions hold: > - messages (and thus the extracted timestamps) were not out of order by > more than 5 secods (as far as I didn't make any mistake in my > partition-level analysis) > - allowedLateness=1 minute > - watermarks are assigned on kafka consumer meaning that they are > synchronized across all partitions > > I don't see how the watermark could have ever been more than 5 seconds > further when the message arrives on the isElementLate filter. Do you have > any idea on this? Is there some existing test that simulates out of order > input to flink's kafka consumer? I could try to build a test case based on > that to possibly reproduce my problem. I'm not sure how to gather enough > debug information on the production stream so that it would clearly show > the watermarks, how they progressed on each kafka partition & later in the > chain in case isElementLate filters something. > > On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Juho, >> >> Thanks for bringing up this topic! I share your intuition. >> IMO, records should only be filtered out and send to a side output if any >> of the windows they would be assigned to is closed already. >> >> I had a look into the code and found that records are filtered out as >> late based on the following condition: >> >> protected boolean isElementLate(StreamRecord<IN> element){ >> return (windowAssigner.isEventTime()) && >> (element.getTimestamp() + allowedLateness <= >> internalTimerService.currentWatermark()); >> } >> >> >> This code shows that your analysis is correct. >> Records are filtered out based on their timestamp and the current >> watermark, even though they arrive before the window is closed. >> >> OTOH, filtering out records based on the window they would end up in can >> also be tricky if records are assigned to multiple windows (e.g., sliding >> windows). >> In this case, a side-outputted records could still be in some windows and >> not in others. >> >> @Aljoscha (CC) Might have an explanation for the current behavior. >> >> Thanks, >> Fabian >> >> >> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>: >> >>> I don't understand why I'm getting some data discarded as late on my >>> Flink stream job a long time before the window even closes. >>> >>> I can not be 100% sure, but to me it seems like the kafka consumer is >>> basically causing the data to be dropped as "late", not the window. I >>> didn't expect this to ever happen? >>> >>> I have a Flink stream job that gathers distinct values using a 24-hour >>> window. It reads the data from Kafka, using a >>> BoundedOutOfOrdernessTimestampExtractor >>> on the kafka consumer to synchronize watermarks accross all kafka >>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds. >>> >>> I have also enabled allowedLateness with 1 minute lateness on the >>> 24-hour window: >>> >>> .timeWindow(Time.days(1)) >>> .allowedLateness(Time.minutes(1)) >>> .sideOutputLateData(lateDataTag) >>> .reduce(new DistinctFunction()) >>> >>> I have used accumulators to see that there is some late data. I have >>> had multiple occurrences of those. >>> >>> Now focusing on a particular case that I was investigating more closely. >>> Around ~12:15 o-clock my late data accumulator started showing that 1 >>> message had been late. That's in the middle of the time window – so why >>> would this happen? I would expect late data to be discarded only sometime >>> after 00:01 if some data is arriving late for the window that just closed >>> at 00:00, and doesn't get emitted as part of 1 minute allowedLateness. >>> >>> To analyze the timestamps I read all messages in sequence separately >>> from each kafka partition and calculated the difference in timestamps >>> between consecutive messages. I had had exactly one message categorized as >>> late by Flink in this case, and at the time i was using maxOutOfOrderness >>> = 5 seconds. I found exactly one message in one kafka partition where the >>> timestamp difference between messages was 5 seconds (they were out of order >>> by 5 s), which makes me wonder, did Flink drop the event as late because it >>> violated maxOutOfOrderness? Have I misunderstood the concept of late >>> data somehow? I only expected late data to happen on window operations. I >>> would expect kafka consumer to pass "late" messages onward even though >>> watermark doesn't change. >>> >>> Thank you very much if you can find the time to look at this! >>> >> >> > >