[ 
https://issues.apache.org/jira/browse/BEAM-10940?focusedWorklogId=505427&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-505427
 ]

ASF GitHub Bot logged work on BEAM-10940:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Oct/20 22:18
            Start Date: 27/Oct/20 22:18
    Worklog Time Spent: 10m 
      Work Description: boyuanzz commented on a change in pull request #13105:
URL: https://github.com/apache/beam/pull/13105#discussion_r513066466



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -1330,6 +1331,14 @@ private void onNewEventTimer(TimerData newTimer) {
       }
     }
 
+    /** Holds the watermark when there is an sdf timer. */
+    private void onNewSdfTimer(TimerData newTimer) {
+      Preconditions.checkState(
+          
StateAndTimerBundleCheckpointHandler.isSdfTimer(newTimer.getTimerId()));
+      Preconditions.checkState(timerUsesOutputTimestamp(newTimer));
+      keyedStateInternals.addWatermarkHoldUsage(newTimer.getOutputTimestamp());
+    }

Review comment:
       Sorry I want to revisit the idea of having `onFiredTimer` here. I think 
it's a good idea to have `onFiredTimer` for firing timers. But the function 
`onNewSdfTimer` and `onNewEventTimer` is about to set watermark hold when 
registering timers. Different from event timer, an SDF timer must have the 
output timestamp for controlling watermark hold. It's important for SDF 
execution. That's why we have a check instead of an if block here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 505427)
    Time Spent: 8h  (was: 7h 50m)

> Portable Flink runner should handle DelayedBundleApplication from 
> ProcessBundleResponse.
> ----------------------------------------------------------------------------------------
>
>                 Key: BEAM-10940
>                 URL: https://issues.apache.org/jira/browse/BEAM-10940
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-flink
>            Reporter: Boyuan Zhang
>            Assignee: Boyuan Zhang
>            Priority: P2
>          Time Spent: 8h
>  Remaining Estimate: 0h
>
> SDF can produce residuals by self-checkpoint, which will be returned to 
> runner by ProcessBundleResponse.DelayedBundleApplication. The portable runner 
> should be able to handle the DelayedBundleApplication and reschedule it based 
> on the timestamp.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to