Zakelly commented on code in PR #25792:
URL: https://github.com/apache/flink/pull/25792#discussion_r1883199135
##########
flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/operators/AbstractAsyncStateStreamOperatorV2.java:
##########
@@ -259,17 +265,33 @@ public void processWatermark(Watermark mark) throws
Exception {
super.processWatermark(mark);
return;
}
- asyncExecutionController.processNonRecord(
- () -> {
- // todo: make async operator deal with interruptible
watermark
- if (timeServiceManager != null) {
- CompletableFuture<Void> future =
timeServiceManager.advanceWatermark(mark);
- future.thenAccept(v -> output.emitWatermark(mark));
-
asyncExecutionController.drainWithTimerIfNeeded(future);
- } else {
- output.emitWatermark(mark);
- }
- });
+ asyncExecutionController.processNonRecord(() ->
doProcessWatermark(mark, null));
+ }
+
+ /**
+ * Handle the watermark and timers, then run a provided {@link Runnable}
asynchronously right
+ * after the watermark is emitted.
+ *
+ * @param mark The watermark.
+ * @param postAction The runnable for post action.
+ */
+ protected void doProcessWatermark(Watermark mark, @Nullable Runnable
postAction)
+ throws Exception {
+ // todo: make async operator deal with interruptible watermark
Review Comment:
Please read https://issues.apache.org/jira/browse/FLINK-20217
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]