boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520874302



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
     processWatermark1(Watermark.MAX_WATERMARK);
     while (getCurrentOutputWatermark() < 
Watermark.MAX_WATERMARK.getTimestamp()) {
       invokeFinishBundle();
+      // Sleep for 5s to wait for any timer to be fired.
+      Thread.sleep(5000);

Review comment:
       Double check the implementation of `DoFnOperator` and 
`ExecutableStageDoFnOperator`, we already invokes `finishBundle` when reaching 
1000 input elements or 1s processing time by default. 
   The real problem for SDF is that it's natural for SDF to read from `Impluse` 
and execute as a high fan-out DoFn. Based on current structure, once `Impluse` 
finishes, `close()` of SDF operator will be called, but meanwhile no more 
processing time timer can be registered. Simply draining timers from operator 
itself is not ideal. 
   Is it possible for us to change something here? For example, the operator 
should wait for global watermark advancing to MAX_TIMESTAMP to finish? Or the 
task should invokes `operator.close()` when global watermark advancing to 
MAX_TIMESTAMP?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to