[ https://issues.apache.org/jira/browse/FLINK-23208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiayi Liao updated FLINK-23208: ------------------------------- Affects Version/s: 1.14.0 1.11.3 1.13.0 1.12.4 > Late processing timers need to wait 1ms at least to be fired > ------------------------------------------------------------ > > Key: FLINK-23208 > URL: https://issues.apache.org/jira/browse/FLINK-23208 > Project: Flink > Issue Type: Bug > Components: Runtime / Task > Affects Versions: 1.11.0, 1.11.3, 1.13.0, 1.14.0, 1.12.4 > Reporter: Jiayi Liao > Priority: Critical > Attachments: screenshot-1.png > > > The problem is from the codes below: > {code:java} > public static long getProcessingTimeDelay(long processingTimestamp, long > currentTimestamp) { > // delay the firing of the timer by 1 ms to align the semantics with > watermark. A watermark > // T says we won't see elements in the future with a timestamp smaller > or equal to T. > // With processing time, we therefore need to delay firing the timer by > one ms. > return Math.max(processingTimestamp - currentTimestamp, 0) + 1; > } > {code} > Assuming a Flink job creates 1 timer per millionseconds, and is able to > consume 1 timer/ms. Here is what will happen: > * Timestmap1(1st ms): timer1 is registered and will be triggered on > Timestamp2. > * Timestamp2(2nd ms): timer2 is registered and timer1 is triggered > * Timestamp3(3rd ms): timer3 is registered and timer1 is consumed, after > this, {{InternalTimerServiceImpl}} registers next timer, which is timer2, and > timer2 will be triggered on Timestamp4(wait 1ms at least) > * Timestamp4(4th ms): timer4 is registered and timer2 is triggered > * Timestamp5(5th ms): timer5 is registered and timer2 is consumed, after > this, {{InternalTimerServiceImpl}} registers next timer, which is timer3, and > timer3 will be triggered on Timestamp6(wait 1ms at least) > As we can see here, the ability of the Flink job is consuming 1 timer/ms, but > it's actually able to consume 0.5 timer/ms. And another problem is that we > cannot observe the delay from the lag metrics of the source(Kafka). Instead, > what we can tell is that the moment of output is much later than expected. > I've added a metrics in our inner version, we can see the lag of the timer > triggering keeps increasing: > !screenshot-1.png! > *In another word, we should never let the late processing timer wait 1ms, I > think a simple change would be as below:* > {code:java} > return Math.max(processingTimestamp - currentTimestamp, -1) + 1; > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)