[
https://issues.apache.org/jira/browse/BEAM-10940?focusedWorklogId=509532&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-509532
]
ASF GitHub Bot logged work on BEAM-10940:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 10/Nov/20 06:32
Start Date: 10/Nov/20 06:32
Worklog Time Spent: 10m
Work Description: boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r520322204
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -502,6 +690,8 @@ public void close() throws Exception {
processWatermark1(Watermark.MAX_WATERMARK);
while (getCurrentOutputWatermark() <
Watermark.MAX_WATERMARK.getTimestamp()) {
invokeFinishBundle();
+ // Sleep for 5s to wait for any timer to be fired.
+ Thread.sleep(5000);
Review comment:
I found that `Thread.sleep(5s)` is not a correct solution. The reason
why it worked occasionally is that the bundle is small enough and I got luck
on race condition. I think we should figure out a way to be able to fire the
SDF processing time timer when a bundle is closed within the life cycle of one
`ExecutableStageDoFnOperator`.
Please correct me if I'm understanding it wrong:
* Flink starts all operators at the same time and closes the operators when
the input watermark reaches MAX_TIMESTAMP, or it closes operators in a reverse
topological order and `close()` is a blocking call?
* The processing time timers will not be fired anymore by the system once
the `operator.close()` is invoked.
* The assumption around `ExecutableStageDoFnOperator` is that there is only
one bundle executing inside one operator. When the output watermark advances to
MAX_TIMESTAMP, we consider this bundle completed.
With supporting SDF initiated checkpoint, we do need to have several bundles
invoked inside one `ExecutableStageDoFnOperator` life cycle, which means we
either:
* Enable Flink to fire processing time timers after `Operator.close()` is
invoked -- this may not be preferrable.
* Or we try to close the bundle before we reach to the `Operator.close()`.
* Or we manually drain SDF timers with scarifying the ability of
`resumeDelay()`. For example, the user may want to reschedule the SDF residuals
in 5 mins but we have to fire it now.
Do you have any ideas/suggestions? Thanks for your help!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 509532)
Time Spent: 11h 50m (was: 11h 40m)
> Portable Flink runner should handle DelayedBundleApplication from
> ProcessBundleResponse.
> ----------------------------------------------------------------------------------------
>
> Key: BEAM-10940
> URL: https://issues.apache.org/jira/browse/BEAM-10940
> Project: Beam
> Issue Type: New Feature
> Components: runner-flink
> Reporter: Boyuan Zhang
> Assignee: Boyuan Zhang
> Priority: P2
> Time Spent: 11h 50m
> Remaining Estimate: 0h
>
> SDF can produce residuals by self-checkpoint, which will be returned to
> runner by ProcessBundleResponse.DelayedBundleApplication. The portable runner
> should be able to handle the DelayedBundleApplication and reschedule it based
> on the timestamp.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)