davidradl commented on code in PR #25792:
URL: https://github.com/apache/flink/pull/25792#discussion_r1882410030
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -368,10 +385,12 @@ protected void processWatermarkStatus(WatermarkStatus
watermarkStatus, int index
boolean wasIdle = combinedWatermark.isIdle();
// index is 0-based
if (combinedWatermark.updateStatus(index,
watermarkStatus.isIdle())) {
- super.processWatermark(
- new
Watermark(combinedWatermark.getCombinedWatermark()));
- }
- if (wasIdle != combinedWatermark.isIdle()) {
+ doProcessWatermark(
Review Comment:
it would be worth including a comment detailing what is happening here. Here
is my understanding , please correct me if I am wrong:
I assume when the if is true: then the combined watermark status has
changed, from either active to idle or the other way round. In this case we
call doProcessWatermark, the first parameter news up a Watermark - I can only
assume that we want to clone the combined watermark. If the idleness of the
combined watermark is unchanged when we do nothing, otherwise we need to send
an idle or active watermark , based on on the supplied watermarkStatus which
should have the same status as the combined watermark.
In the else - if the combined watermark did not change then we check whether
the existing combined watermark idleness is not the same as the supplied
watermark.
- I assume we could have 2 idle sources coming into an operator so the
combined watermark status would be idle, if one of the sources becomes active,
then the combined watermark will now be able to progress ignoring the other
idle source.
- I assume we could have 2 active sources coming into an operator so the
combined watermark status would be active, if one of the sources becomes idle,
then the combined watermark will still be able to progress ignoring the idle
source. In this case why do we need to send a watermark?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]