davidradl commented on code in PR #25792:
URL: https://github.com/apache/flink/pull/25792#discussion_r1882410030


##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -368,10 +385,12 @@ protected void processWatermarkStatus(WatermarkStatus 
watermarkStatus, int index
                     boolean wasIdle = combinedWatermark.isIdle();
                     // index is 0-based
                     if (combinedWatermark.updateStatus(index, 
watermarkStatus.isIdle())) {
-                        super.processWatermark(
-                                new 
Watermark(combinedWatermark.getCombinedWatermark()));
-                    }
-                    if (wasIdle != combinedWatermark.isIdle()) {
+                        doProcessWatermark(

Review Comment:
   it would be worth including a comment detailing what is happening here. Here 
is my understanding , please correct me if I am wrong:
   
   I assume when the if is true: then the combined watermark status has 
changed, from either active to idle or the other way round. In this case we 
call doProcessWatermark, the first parameter news up a Watermark - I can only 
assume that we want to clone the combined watermark. If the idleness of the 
combined watermark is unchanged when we do nothing, otherwise we need to send 
an idle or active watermark , based on on the supplied watermarkStatus which 
should have the same state as the combined watermark. 
   
   In the else - if the combined watermark did not change then we check whether 
the existing combined watermark idleness is not the same as the supplied 
watermark. In this case why do we emit the watermark? I assume we could have 2 
idle sources coming into an operator so the combined water mark would be idle, 
if one of the sources becomes active, then the combined watermark will not have 
changed status so why would we emit a watermark.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to