Hi,

Thanks for reporting the issue. I think this might be caused by 
System.currentTimeMillis() not being monotonic [1] and the fact Flink is 
accessing this function per element multiple times (at least twice: first for 
creating a window, second to perform the check that has failed in your case), 
however I’m pretty sure that this is more general problem in more places.I have 
created a ticket for this. [2]

I’m not sure if there is an easy hot fix for that. You would have to increase 
inactivity gap, switch to ingestion/even time (anyway preferable), make sure 
that machine’s time doesn’t change or just ignore the problem and accept some 
failure from time to time.

Piotrek

[1] 
https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls
 
<https://stackoverflow.com/questions/2978598/will-system-currenttimemillis-always-return-a-value-previous-calls>
[2] https://issues.apache.org/jira/browse/FLINK-12872 
<https://issues.apache.org/jira/browse/FLINK-12872>

> On 14 Jun 2019, at 10:14, Abhishek Jain <abhijai...@gmail.com> wrote:
> 
> Hi,
> I have a job that uses processing time session window with inactivity gap of 
> 60ms where I intermittently run into the following exception. I'm trying to 
> figure out what happened here. Haven't been able to reproduce this scenario. 
> Any thoughts?
> 
> java.lang.UnsupportedOperationException: The end timestamp of a 
> processing-time window cannot become earlier than the current processing time 
> by merging. Current processing time: 1560493731808 window: 
> TimeWindow{start=1560493731654, end=1560493731778}
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:325)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:311)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
>       at 
> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:311)
>       at 
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
>       at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>       at java.lang.Thread.run(Thread.java:745)
> 
> -- 
> Warm Regards,
> Abhishek Jain
> 

Reply via email to