[ 
https://issues.apache.org/jira/browse/BEAM-9733?focusedWorklogId=426392&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-426392
 ]

ASF GitHub Bot logged work on BEAM-9733:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/20 06:05
            Start Date: 23/Apr/20 06:05
    Worklog Time Spent: 10m 
      Work Description: tweise commented on a change in pull request #11362:
URL: https://github.com/apache/beam/pull/11362#discussion_r413533148



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
##########
@@ -544,30 +601,49 @@ public void processWatermark(Watermark mark) throws 
Exception {
     // every watermark. So we have implemented 2) below.
     //
     if (sdkHarnessRunner.isBundleInProgress()) {
-      if (mark.getTimestamp() >= 
BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-        invokeFinishBundle();
-        setPushedBackWatermark(Long.MAX_VALUE);
+      if (minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE) {
+        // We can safely advance the watermark to before the last bundle's 
minimum event timer
+        // but not past the potential output watermark which includes holds to 
the input watermark.
+        return Math.min(minEventTimeTimerTimestampInLastBundle - 1, 
potentialOutputWatermark);
       } else {
-        // It is not safe to advance the output watermark yet, so add a hold 
on the current
-        // output watermark.
-        backupWatermarkHold = Math.max(backupWatermarkHold, 
getPushbackWatermarkHold());
-        setPushedBackWatermark(Math.min(currentOutputWatermark, 
backupWatermarkHold));
-        super.setBundleFinishedCallback(
-            () -> {
-              try {
-                LOG.debug("processing pushed back watermark: {}", mark);
-                // at this point the bundle is finished, allow the watermark 
to pass
-                // we are restoring the previous hold in case it was already 
set for side inputs
-                setPushedBackWatermark(backupWatermarkHold);
-                super.processWatermark(mark);
-              } catch (Exception e) {
-                throw new RuntimeException(
-                    "Failed to process pushed back watermark after finished 
bundle.", e);
-              }
-            });
+        // We don't have any information yet, use the current output watermark 
for now.
+        return currentOutputWatermark;
+      }
+    } else {
+      // No bundle was started when we advanced the input watermark.
+      // Thus, we can safely set a new output watermark.
+      return potentialOutputWatermark;
+    }
+  }
+
+  private void preBundleStartCallback() {
+    inputWatermarkBeforeBundleStart = getEffectiveInputWatermark();
+  }
+
+  @SuppressWarnings("FutureReturnValueIgnored")
+  private void finishBundleCallback() {
+    minEventTimeTimerTimestampInLastBundle = 
minEventTimeTimerTimestampInCurrentBundle;
+    minEventTimeTimerTimestampInCurrentBundle = Long.MAX_VALUE;
+    try {
+      if (!closed
+          && minEventTimeTimerTimestampInLastBundle < Long.MAX_VALUE
+          && minEventTimeTimerTimestampInLastBundle <= 
getEffectiveInputWatermark()) {
+        ProcessingTimeService processingTimeService = 
getProcessingTimeService();
+        // We are scheduling a timer for advancing the watermark. Otherwise we
+        // could potentially loop forever here when a timer keeps scheduling a 
timer
+        // for the same timestamp. This in itself would not be an issue. 
However,

Review comment:
       The comment is a bit misleading. How about: "We are scheduling a timer 
for advancing the watermark, to not delay finishing the bundle and temporarily 
release the checkpoint lock. Otherwise, we could potentially loop when a timer 
keeps scheduling a timer for the same timestamp."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 426392)
    Time Spent: 8h 40m  (was: 8.5h)

> ImpulseSourceFunction does not emit a final watermark
> -----------------------------------------------------
>
>                 Key: BEAM-9733
>                 URL: https://issues.apache.org/jira/browse/BEAM-9733
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.21.0
>
>          Time Spent: 8h 40m
>  Remaining Estimate: 0h
>
> The Flink Runner's {{ImpulseSourceFunction}} does not emit a final watermark, 
> unless {{--shutdownSourcesOnFinalWatermark}} flag has been specified (the 
> flag is used in tests to shutdown the pipeline after reading all data). Most 
> pipelines will be long-running and thus do not specify the flag. 
> Not sending out the final watermark causes GroupByKey to hold back the data 
> of event time windows until the pipeline is shut down (the final watermark is 
> always emitted on pipeline shutdown which is why using the above flag works).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to