Zakelly commented on code in PR #25792:
URL: https://github.com/apache/flink/pull/25792#discussion_r1883189632
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperator.java:
##########
@@ -368,10 +385,12 @@ protected void processWatermarkStatus(WatermarkStatus
watermarkStatus, int index
boolean wasIdle = combinedWatermark.isIdle();
// index is 0-based
if (combinedWatermark.updateStatus(index,
watermarkStatus.isIdle())) {
- super.processWatermark(
- new
Watermark(combinedWatermark.getCombinedWatermark()));
- }
- if (wasIdle != combinedWatermark.isIdle()) {
+ doProcessWatermark(
Review Comment:
This part is basically a rewrite of the overriden method,
`AbstractStreamOperator#emitWatermarkStatus`, considering the async timer
processing and emitting watermark or status after that. The logic in the method
of parent class is more readable. Which is:
- If a combined watermark changes (because some of the watermarks become
active), we process the combined watermark.
- If the combined watermark idle/active status switched, notify the
downstream.
These two if(s) are independent.
This PR ensures watermark status emitting after the watermark processing
under the async context. I don't think a comment or two would explain this
logic well, but you can get the point if you compare this method with the one
from parent class.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]