Repository: flink
Updated Branches:
  refs/heads/release-1.5 1f36a66ec -> 726f1cce3


[FLINK-9857] Delay firing of processing-time timers by 1 ms


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/726f1cce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/726f1cce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/726f1cce

Branch: refs/heads/release-1.5
Commit: 726f1cce32ed3f0f7c04c289f2d089438ecdaae6
Parents: 1f36a66
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Mon Jul 16 15:58:07 2018 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Tue Jul 17 14:54:17 2018 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/SystemProcessingTimeService.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/726f1cce/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 4e4208f..987ddd9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -109,7 +109,11 @@ public class SystemProcessingTimeService extends 
ProcessingTimeService {
         */
        @Override
        public ScheduledFuture<?> registerTimer(long timestamp, 
ProcessingTimeCallback target) {
-               long delay = Math.max(timestamp - getCurrentProcessingTime(), 
0);
+
+               // delay the firing of the timer by 1 ms to align the semantics 
with watermark. A watermark
+               // T says we won't see elements in the future with a timestamp 
smaller or equal to T.
+               // With processing time, we therefore need to delay firing the 
timer by one ms.
+               long delay = Math.max(timestamp - getCurrentProcessingTime(), 
0) + 1;
 
                // we directly try to register the timer and only react to the 
status on exception
                // that way we save unnecessary volatile accesses for each timer

Reply via email to