boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520322204
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
processWatermark1(Watermark.MAX_WATERMARK);
while (getCurrentOutputWatermark() <
Watermark.MAX_WATERMARK.getTimestamp()) {
invokeFinishBundle();
+ // Sleep for 5s to wait for any timer to be fired.
+ Thread.sleep(5000);
Review comment:
I found that `Thread.sleep(5s)` is not a correct solution. The reason
why it worked occasionally is that the bundle is small enough and I got luck
on race condition. I think we should figure out a way to be able to fire the
SDF processing time timer when a bundle is closed within the life cycle of one
`ExecutableStageDoFnOperator`.
Please correct me if I'm understanding it wrong:
* Flink starts all operators at the same time and closes the operators when
the input watermark reaches MAX_TIMESTAMP, or it closes operators in a reverse
topological order and `close()` is a blocking call?
* The processing time timers will not be fired anymore by the system once
the `operator.close()` is invoked.
* The assumption around `ExecutableStageDoFnOperator` is that there is only
one bundle executing inside one operator. When the output watermark advances to
MAX_TIMESTAMP, we consider this bundle completed.
With supporting SDF initiated checkpoint, we do need to have several bundles
invoked inside one `ExecutableStageDoFnOperator` life cycle, which means we
either:
* Enable Flink to fire processing time timers after `Operator.close()` is
invoked -- this may not be preferrable.
* Or we try to close the bundle before we reach to the `Operator.close()`.
* Or we manually drain SDF timers with scarifying the ability of
`resumeDelay()`. For example, the user may want to reschedule the SDF residuals
in 5 mins but we have to fire it now.
Do you have any ideas/suggestions? Thanks for your help!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]