davidradl commented on code in PR #25792:
URL: https://github.com/apache/flink/pull/25792#discussion_r1882410030
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -368,10 +385,12 @@ protected void processWatermarkStatus(WatermarkStatus
watermarkStatus, int index
boolean wasIdle = combinedWatermark.isIdle();
// index is 0-based
if (combinedWatermark.updateStatus(index,
watermarkStatus.isIdle())) {
- super.processWatermark(
- new
Watermark(combinedWatermark.getCombinedWatermark()));
- }
- if (wasIdle != combinedWatermark.isIdle()) {
+ doProcessWatermark(
Review Comment:
it would be worth including a comment detailing what is happening here. Here
is my understanding , please correct me if I am wrong:
I assume when the if is true: then the combined watermark status has
changed, from either active to idle or the other way round. In this case we
call doProcessWatermark, the first parameter news up a Watermark - I can only
assume that we want to clone the combined watermark. If the idleness of the
combined watermark is unchanged when we do nothing, otherwise we need to send
an idle or active watermark , based on on the supplied watermarkStatus which
should have the same status as the combined watermark.
In the else - if the combined watermark did not change then we check whether
the existing combined watermark idleness is not the same as the supplied
watermark. In this case why do we emit the watermark? I assume we could have 2
idle sources coming into an operator so the combined water mark would be idle,
if one of the sources becomes active, then the combined watermark will not have
changed status so why would we emit a watermark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]