boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520322204



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < 
Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       I found that `Thread.sleep(5s)` is not a correct solution. The reason 
why it worked occasionally  is that the bundle is small enough and I got luck 
on race condition. I think we should figure out a way to be able to fire the 
SDF processing time timer when a bundle is closed within the life cycle of one 
`ExecutableStageDoFnOperator`.
   Please correct me if I'm understanding it wrong:
   * Flink starts all operators at the same time and closes the operators when 
the input watermark reaches MAX_TIMESTAMP, or it closes operators in a reverse 
topological order and `close()` is a blocking call?
   * The processing time timers will not be fired anymore by the system once 
the `operator.close()` is invoked.
   * The assumption around `ExecutableStageDoFnOperator` is that there is only 
one bundle executing inside one operator. When the output watermark advances to 
MAX_TIMESTAMP, we consider this bundle completed.
   
   With supporting SDF initiated checkpoint, we do need to have several bundles 
invoked inside one `ExecutableStageDoFnOperator` life cycle, which means we 
either:
   
   * Enable Flink to fire processing time timers after `Operator.close()` is 
invoked -- this may not be preferrable.
   * Or we try to close the bundle before we reach to the `Operator.close()`.
   * Or we manually drain SDF timers with scarifying the ability of 
`resumeDelay()`. For example, the user may want to reschedule the SDF residuals 
in 5 mins but we have to fire it now.
   
   Do you have any ideas/suggestions? Thanks for your help!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to