Hi, Thanks for reporting the issue. I think this might be caused by System.currentTimeMillis() not being monotonic [1] and the fact Flink is accessing this function per element multiple times (at least twice: first for creating a window, second to perform the check that has failed in your case), however I’m pretty sure that this is more general problem in more places.I have created a ticket for this. [2]
I’m not sure if there is an easy hot fix for that. You would have to increase inactivity gap, switch to ingestion/even time (anyway preferable), make sure that machine’s time doesn’t change or just ignore the problem and accept some failure from time to time. Piotrek [1] https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls <https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls> [2] https://issues.apache.org/jira/browse/FLINK-12872 <https://issues.apache.org/jira/browse/FLINK-12872> > On 14 Jun 2019, at 10:14, Abhishek Jain <abhijai...@gmail.com> wrote: > > Hi, > I have a job that uses processing time session window with inactivity gap of > 60ms where I intermittently run into the following exception. I'm trying to > figure out what happened here. Haven't been able to reproduce this scenario. > Any thoughts? > > java.lang.UnsupportedOperationException: The end timestamp of a > processing-time window cannot become earlier than the current processing time > by merging. Current processing time: 1560493731808 window: > TimeWindow{start=1560493731654, end=1560493731778} > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212) > at > org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311) > at > org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) > at java.lang.Thread.run(Thread.java:745) > > -- > Warm Regards, > Abhishek Jain >