haishui126 commented on code in PR #22291:
URL: https://github.com/apache/flink/pull/22291#discussion_r1160388130


##########
flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java:
##########
@@ -178,9 +178,16 @@ void announceCombinedWatermark() {
                                     
aggregator.getAggregatedWatermark().getTimestamp());
                         });
 
-        long maxAllowedWatermark =
-                globalCombinedWatermark.getTimestamp()
-                        + 
watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
+        long maxAllowedWatermark;
+        try {
+            maxAllowedWatermark =

Review Comment:
   Thanks for the suggestion, but the 
`Long.MAX_VALUE-globalCombinedWatermark.getTimestamp()` will overflow when 
`globalCombinedWatermark.getTimestamp()` is negated. If It's necessary to 
replace the try-catch, we can use: 
   ```java
           long maxAllowedWatermark =
                   globalCombinedWatermark.getTimestamp()
                           + 
watermarkAlignmentParams.getMaxAllowedWatermarkDrift();
           // check maxAllowedWatermark arithmetic overflow when the source is 
idle
           if (((globalCombinedWatermark.getTimestamp() ^ maxAllowedWatermark)
                           & 
(watermarkAlignmentParams.getMaxAllowedWatermarkDrift()
                                   ^ maxAllowedWatermark))
                   < 0) {
               maxAllowedWatermark = Watermark.MAX_WATERMARK.getTimestamp();
           }
   ```
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to