Thanks for correcting me Piotr. I didn't look close enough at the code.
With the presently implemented logic, a record should not be emitted to a
side output if its window wasn't closed yet.

2018-05-11 14:13 GMT+02:00 Piotr Nowojski <pi...@data-artisans.com>:

> Generally speaking best practise is always to simplify your program as
> much as possible to narrow down the scope of the search. Replace data
> source with statically generated events, remove unnecessary components Etc.
> Either such process help you figure out what’s wrong on your own and if
> not, if you share us such minimal program that reproduces the issue, it
> will allow  us to debug it.
>
> Piotrek
>
>
> On 11 May 2018, at 13:54, Juho Autio <juho.au...@rovio.com> wrote:
>
> Thanks for that code snippet, I should try it out to simulate my DAG.. If
> any suggestions how to debug futher what's causing late data on a
> production stream job, please let me know.
>
> On Fri, May 11, 2018 at 2:18 PM, Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
>> Hey,
>>
>> Actually I think Fabian initial message was incorrect. As far as I can
>> see in the code of WindowOperator (last lines of org.apache.flink.streaming.
>> runtime.operators.windowing.WindowOperator#processElement ), the element
>> is sent to late side output if it is late AND it wasn’t assigned to any of
>> the existing windows (because they were late as well). In other words, it
>> should work as you Juho are wishing: element should be marked as late once
>> they are overdue/late for the window after one full day.
>>
>> I have tested it and it works as expected. Following program:
>>
>> https://gist.github.com/pnowojski/8cd650170925cf35be521cf236f1d97a
>>
>> Prints only ONE number to the standard err:
>>
>> > 1394
>>
>> And there is nothing on the side output.
>>
>> Piotrek
>>
>> On 11 May 2018, at 12:32, Juho Autio <juho.au...@rovio.com> wrote:
>>
>> Thanks. What I still don't get is why my message got filtered in the
>> first place. Even if the allowed lateness filtering would be done "on the
>> window", data should not be dropped as late if it's not in fact late by
>> more than the allowedLateness setting.
>>
>> Assuming that these conditions hold:
>> - messages (and thus the extracted timestamps) were not out of order by
>> more than 5 secods (as far as I didn't make any mistake in my
>> partition-level analysis)
>> - allowedLateness=1 minute
>> - watermarks are assigned on kafka consumer meaning that they are
>> synchronized across all partitions
>>
>> I don't see how the watermark could have ever been more than 5 seconds
>> further when the message arrives on the isElementLate filter. Do you have
>> any idea on this? Is there some existing test that simulates out of order
>> input to flink's kafka consumer? I could try to build a test case based on
>> that to possibly reproduce my problem. I'm not sure how to gather enough
>> debug information on the production stream so that it would clearly show
>> the watermarks, how they progressed on each kafka partition & later in the
>> chain in case isElementLate filters something.
>>
>> On Fri, May 11, 2018 at 12:12 PM, Fabian Hueske <fhue...@gmail.com>
>> wrote:
>>
>>> Hi Juho,
>>>
>>> Thanks for bringing up this topic! I share your intuition.
>>> IMO, records should only be filtered out and send to a side output if
>>> any of the windows they would be assigned to is closed already.
>>>
>>> I had a look into the code and found that records are filtered out as
>>> late based on the following condition:
>>>
>>> protected boolean isElementLate(StreamRecord<IN> element){
>>>    return (windowAssigner.isEventTime()) &&
>>>       (element.getTimestamp() + allowedLateness <=
>>> internalTimerService.currentWatermark());
>>> }
>>>
>>>
>>> This code shows that your analysis is correct.
>>> Records are filtered out based on their timestamp and the current
>>> watermark, even though they arrive before the window is closed.
>>>
>>> OTOH, filtering out records based on the window they would end up in can
>>> also be tricky if records are assigned to multiple windows (e.g., sliding
>>> windows).
>>> In this case, a side-outputted records could still be in some windows
>>> and not in others.
>>>
>>> @Aljoscha (CC) Might have an explanation for the current behavior.
>>>
>>> Thanks,
>>> Fabian
>>>
>>>
>>> 2018-05-11 10:55 GMT+02:00 Juho Autio <juho.au...@rovio.com>:
>>>
>>>> I don't understand why I'm getting some data discarded as late on my
>>>> Flink stream job a long time before the window even closes.
>>>>
>>>> I can not be 100% sure, but to me it seems like the kafka consumer is
>>>> basically causing the data to be dropped as "late", not the window. I
>>>> didn't expect this to ever happen?
>>>>
>>>> I have a Flink stream job that gathers distinct values using a 24-hour
>>>> window. It reads the data from Kafka, using a 
>>>> BoundedOutOfOrdernessTimestampExtractor
>>>> on the kafka consumer to synchronize watermarks accross all kafka
>>>> partitions. The maxOutOfOrderness of the extractor is set to 10 seconds.
>>>>
>>>> I have also enabled allowedLateness with 1 minute lateness on the
>>>> 24-hour window:
>>>>
>>>> .timeWindow(Time.days(1))
>>>> .allowedLateness(Time.minutes(1))
>>>> .sideOutputLateData(lateDataTag)
>>>> .reduce(new DistinctFunction())
>>>>
>>>> I have used accumulators to see that there is some late data. I have
>>>> had multiple occurrences of those.
>>>>
>>>> Now focusing on a particular case that I was investigating more
>>>> closely. Around ~12:15 o-clock my late data accumulator started showing
>>>> that 1 message had been late. That's in the middle of the time window – so
>>>> why would this happen? I would expect late data to be discarded only
>>>> sometime after 00:01 if some data is arriving late for the window that just
>>>> closed at 00:00, and doesn't get emitted as part of 1 minute
>>>> allowedLateness.
>>>>
>>>> To analyze the timestamps I read all messages in sequence separately
>>>> from each kafka partition and calculated the difference in timestamps
>>>> between consecutive messages. I had had exactly one message categorized as
>>>> late by Flink in this case, and at the time i was using maxOutOfOrderness
>>>> = 5 seconds. I found exactly one message in one kafka partition where the
>>>> timestamp difference between messages was 5 seconds (they were out of order
>>>> by 5 s), which makes me wonder, did Flink drop the event as late because it
>>>> violated maxOutOfOrderness? Have I misunderstood the concept of late
>>>> data somehow? I only expected late data to happen on window operations. I
>>>> would expect kafka consumer to pass "late" messages onward even though
>>>> watermark doesn't change.
>>>>
>>>> Thank you very much if you can find the time to look at this!
>>>>
>>>
>>>
>>
>>
>
>
>

Reply via email to