Hi Vishal, based on the error message and the behavior you described, introducing a filter for late events is the way to go - just as described in the SO thread you mentioned. Usually, you would collect late events in some kind of side output [1].
I hope that helps. Matthias [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#getting-late-data-as-a-side-output On Thu, Apr 22, 2021 at 3:22 PM Vishal Santoshi <vishal.santo...@gmail.com> wrote: > I saw > https://stackoverflow.com/questions/57334257/the-end-timestamp-of-an-event-time-window-cannot-become-earlier-than-the-current. > and this seems to suggest a straight up filter, but I am not sure how does > that filter works as in would it factor is the lateness when filtering ? > > On Thu, Apr 22, 2021 at 8:24 AM Vishal Santoshi <vishal.santo...@gmail.com> > wrote: > >> Well it was not a solution after all. We now have a session window that >> is stuck with the same issue albeit after the additional lateness. I had >> increased the lateness to 2 days and that masked the issue which again >> reared it's head after the 2 days ;lateness was over ( instead of the 1 day >> ) before. This is very disconcerting. >> >> Caused by: java.lang.UnsupportedOperationException: The end timestamp of >> an event-time window cannot become earlier than the current watermark by >> merging. Current watermark: 1619053742129 window: TimeWindow{start= >> 1618877773663, end=1618879580402} >> >> >> >> On Wed, Apr 21, 2021 at 7:05 PM Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Hey folks, >>> I had a pipe with sessionization restarts and then fail >>> after retries with this exception. The only thing I had done was to >>> increase the lateness by 12 hours ( to a day ) in this pipe and restart >>> from SP and it ran for 12 hours plus without issue. I cannot imagine that >>> increasing the lateness created this and the way I solved this was to >>> increase the lateness further. Could this be if there are TMs in the >>> cluster whose time is off ( as in not synchronized ) ? >>> >>> 2021-04-21 11:27:58 >>> java.lang.UnsupportedOperationException: The end timestamp of an >>> event-time window cannot become earlier than the current watermark by >>> merging. Current watermark: 1618966593999 window: TimeWindow{start= >>> 1618878336107, end=1618880140466} >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator$2.merge(WindowOperator.java:339) >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator$2.merge(WindowOperator.java:321) >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> MergingWindowSet.addWindow(MergingWindowSet.java:209) >>> at org.apache.flink.streaming.runtime.operators.windowing. >>> WindowOperator.processElement(WindowOperator.java:319) >>> at org.apache.flink.streaming.runtime.tasks. >>> OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask >>> .java:191) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .processElement(StreamTaskNetworkInput.java:204) >>> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput >>> .emitNext(StreamTaskNetworkInput.java:174) >>> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor >>> .processInput(StreamOneInputProcessor.java:65) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( >>> StreamTask.java:396) >>> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor >>> .runMailboxLoop(MailboxProcessor.java:191) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>> .runMailboxLoop(StreamTask.java:617) >>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>> StreamTask.java:581) >>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >>> at java.lang.Thread.run(Thread.java:748) >>> >>> >>>