akalash commented on a change in pull request #17102:
URL: https://github.com/apache/flink/pull/17102#discussion_r700272244



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java
##########
@@ -26,21 +26,22 @@
 
     /**
      * Returns the remaining delay of the processing time specified by {@code 
processingTimestamp}.
+     * This delay should guarantee that the timer will be fired after all 
record with timestamp <=
+     * {@code processingTimestamp} have been already processed.
      *
      * @param processingTimestamp the processing time in milliseconds
      * @param currentTimestamp the current processing timestamp; it usually 
uses {@link
      *     ProcessingTimeService#getCurrentProcessingTime()} to get
      * @return the remaining delay of the processing time
      */
-    public static long getProcessingTimeDelay(long processingTimestamp, long 
currentTimestamp) {
+    public static long getRecordProcessingTimeDelay(
+            long processingTimestamp, long currentTimestamp) {
 
-        // Two cases of timers here:
-        // (1) future/now timers(processingTimestamp >= currentTimestamp): 
delay the firing of the
-        //   timer by 1 ms to align the semantics with watermark. A watermark 
T says we won't see
-        //   elements in the future with a timestamp smaller or equal to T. 
With processing time, we
-        //   therefore need to delay firing the timer by one ms.
-        // (2) past timers(processingTimestamp < currentTimestamp): do not 
need to delay the firing
-        //   because currentTimestamp is larger than processingTimestamp 
pluses the 1ms offset.
+        // future/now timers(processingTimestamp >= currentTimestamp): delay 
the firing of the
+        // timer by 1 ms to align the semantics with watermark. A watermark T 
says we won't see
+        // elements in the future with a timestamp smaller or equal to T. 
Without this 1ms delay,
+        // if we had fired the timer for T at the timestamp T, it would be 
possible that we would
+        // process another record for timestamp == T in the same millisecond.

Review comment:
       I actually am not sure about these changes by myself so if somebody 
think that it isn't better than it was before we can just revert these changes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to