Hi Vishal,
based on the error message and the behavior you described, introducing a
filter for late events is the way to go - just as described in the SO
thread you mentioned. Usually, you would collect late events in some kind
of side output [1].

I hope that helps.
Matthias

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output

On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I saw
> https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current.
> and this seems to suggest a straight up filter, but I am not sure how does
> that filter works as in would it factor is the lateness when filtering ?
>
> On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> Well it was not a solution after all. We now have a session window that
>> is stuck with the same issue albeit  after the additional lateness. I had
>> increased the lateness to 2 days and that masked the issue which again
>> reared it's head after the 2 days ;lateness was over ( instead of the 1 day
>> ) before. This is very disconcerting.
>>
>> Caused by: java.lang.UnsupportedOperationException: The end timestamp of
>> an event-time window cannot become earlier than the current watermark by
>> merging. Current watermark: 1619053742129 window: TimeWindow{start=
>> 1618877773663, end=1618879580402}
>>
>>
>>
>> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hey folks,
>>>                I had a pipe with sessionization restarts and then fail
>>> after retries with this exception. The only thing I had done was to
>>> increase the lateness by 12 hours ( to  a day )  in this pipe and restart
>>> from SP and it ran for 12 hours plus without issue. I cannot imagine that
>>> increasing the lateness created this and the way I solved this was to
>>> increase the lateness further. Could this be if there are TMs in the
>>> cluster whose time is off ( as in not synchronized )  ?
>>>
>>> 2021-04-21 11:27:58
>>> java.lang.UnsupportedOperationException: The end timestamp of an
>>> event-time window cannot become earlier than the current watermark by
>>> merging. Current watermark: 1618966593999 window: TimeWindow{start=
>>> 1618878336107, end=1618880140466}
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:339)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator$2.merge(WindowOperator.java:321)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> MergingWindowSet.addWindow(MergingWindowSet.java:209)
>>>     at org.apache.flink.streaming.runtime.operators.windowing.
>>> WindowOperator.processElement(WindowOperator.java:319)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask
>>> .java:191)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .processElement(StreamTaskNetworkInput.java:204)
>>>     at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput
>>> .emitNext(StreamTaskNetworkInput.java:174)
>>>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
>>> .processInput(StreamOneInputProcessor.java:65)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
>>> StreamTask.java:396)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
>>> .runMailboxLoop(MailboxProcessor.java:191)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .runMailboxLoop(StreamTask.java:617)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:581)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>     at java.lang.Thread.run(Thread.java:748)
>>>
>>>
>>>

Reply via email to