RocMarshal commented on code in PR #22852:
URL: https://github.com/apache/flink/pull/22852#discussion_r1242057574


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -642,7 +643,32 @@ private static class WatermarkAggregator<T> {
          *     Optional.empty()} otherwise.
          */
         public Optional<Watermark> aggregate(T key, Watermark watermark) {
-            watermarks.put(key, watermark);
+            Watermark oldWatermark = watermarks.put(key, watermark);
+            // Step (1): Update the latest watermark of current key as the 
aggregatedWatermark
+            // directly if it is less than the aggregatedWatermark.
+            if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp()) 
{
+                aggregatedWatermark = watermark;
+                return Optional.of(aggregatedWatermark);
+            }
+
+            // Step(2): The aggWM won't change when these conditions are met, 
so return directly:
+            // case1. The latest WM of the current key isn't changed
+            // case2. When oldWatermark isn't null and is greater than aggWm, 
it means that aggWm
+            // comes from other keys. If new WM is greater than or equal to 
aggWm, then aggWm must
+            // not change.
+            // case3. When oldWatermark is null and {@link watermarks} has 
other keys, it means that
+            // aggWm comes from other keys. If new WM is greater than or equal 
to aggWm, then aggWm
+            // must not change.
+            // Note: step(1) have returned when `watermark < 
aggregatedWatermark`, so all calls

Review Comment:
   How about 
   ```
   Here's an implicit condition `watermark >= aggregatedWatermark` due to 
step(1), that's why it's not written in the code of case2 and case3.
   ```
   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to