akalash commented on a change in pull request #17102: URL: https://github.com/apache/flink/pull/17102#discussion_r700272244
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeServiceUtil.java ########## @@ -26,21 +26,22 @@ /** * Returns the remaining delay of the processing time specified by {@code processingTimestamp}. + * This delay should guarantee that the timer will be fired after all record with timestamp <= + * {@code processingTimestamp} have been already processed. * * @param processingTimestamp the processing time in milliseconds * @param currentTimestamp the current processing timestamp; it usually uses {@link * ProcessingTimeService#getCurrentProcessingTime()} to get * @return the remaining delay of the processing time */ - public static long getProcessingTimeDelay(long processingTimestamp, long currentTimestamp) { + public static long getRecordProcessingTimeDelay( + long processingTimestamp, long currentTimestamp) { - // Two cases of timers here: - // (1) future/now timers(processingTimestamp >= currentTimestamp): delay the firing of the - // timer by 1 ms to align the semantics with watermark. A watermark T says we won't see - // elements in the future with a timestamp smaller or equal to T. With processing time, we - // therefore need to delay firing the timer by one ms. - // (2) past timers(processingTimestamp < currentTimestamp): do not need to delay the firing - // because currentTimestamp is larger than processingTimestamp pluses the 1ms offset. + // future/now timers(processingTimestamp >= currentTimestamp): delay the firing of the + // timer by 1 ms to align the semantics with watermark. A watermark T says we won't see + // elements in the future with a timestamp smaller or equal to T. Without this 1ms delay, + // if we had fired the timer for T at the timestamp T, it would be possible that we would + // process another record for timestamp == T in the same millisecond. Review comment: I actually am not sure about these changes by myself so if somebody think that it isn't better than it was before we can just revert these changes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org