Repository: flink Updated Branches: refs/heads/release-1.5 1f36a66ec -> 726f1cce3
[FLINK-9857] Delay firing of processing-time timers by 1 ms Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/726f1cce Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/726f1cce Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/726f1cce Branch: refs/heads/release-1.5 Commit: 726f1cce32ed3f0f7c04c289f2d089438ecdaae6 Parents: 1f36a66 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Mon Jul 16 15:58:07 2018 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Jul 17 14:54:17 2018 +0200 ---------------------------------------------------------------------- .../streaming/runtime/tasks/SystemProcessingTimeService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/726f1cce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java index 4e4208f..987ddd9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java @@ -109,7 +109,11 @@ public class SystemProcessingTimeService extends ProcessingTimeService { */ @Override public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) { - long delay = Math.max(timestamp - getCurrentProcessingTime(), 0); + + // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark + // T says we won't see elements in the future with a timestamp smaller or equal to T. + // With processing time, we therefore need to delay firing the timer by one ms. + long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1; // we directly try to register the timer and only react to the status on exception // that way we save unnecessary volatile accesses for each timer