[ 
https://issues.apache.org/jira/browse/BEAM-9733?focusedWorklogId=426653&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-426653
 ]

ASF GitHub Bot logged work on BEAM-9733:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Apr/20 17:25
            Start Date: 23/Apr/20 17:25
    Worklog Time Spent: 10m 
      Work Description: mxm commented on a change in pull request #11362:
URL: https://github.com/apache/beam/pull/11362#discussion_r413984518



##########
File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
##########
@@ -658,46 +671,69 @@ public void processWatermark1(Watermark mark) throws 
Exception {
       emitAllPushedBackData();
     }
 
-    setCurrentInputWatermark(mark.getTimestamp());
+    currentInputWatermark = mark.getTimestamp();
 
-    if (keyCoder == null) {
-      long potentialOutputWatermark = Math.min(getPushbackWatermarkHold(), 
currentInputWatermark);
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        setCurrentOutputWatermark(potentialOutputWatermark);
-        emitWatermark(currentOutputWatermark);
-      }
-    } else {
-      // hold back by the pushed back values waiting for side inputs
-      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), 
mark.getTimestamp());
+    long inputWatermarkHold = 
applyInputWatermarkHold(getEffectiveInputWatermark());
+    if (keyCoder != null) {
+      timeServiceManager.advanceWatermark(new Watermark(inputWatermarkHold));
+    }
 
-      timeServiceManager.advanceWatermark(new 
Watermark(pushedBackInputWatermark));
+    long potentialOutputWatermark =
+        applyOutputWatermarkHold(
+            currentOutputWatermark, 
computeOutputWatermark(inputWatermarkHold));
+    maybeEmitWatermark(potentialOutputWatermark);
+  }
 
-      Instant watermarkHold = keyedStateInternals.watermarkHold();
+  /**
+   * Allows to apply a hold to the input watermark. By default, just passes 
the input watermark
+   * through.
+   */
+  public long applyInputWatermarkHold(long inputWatermark) {
+    return inputWatermark;
+  }
 
-      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), 
getPushbackWatermarkHold());
-      combinedWatermarkHold =
-          Math.min(combinedWatermarkHold, 
timerInternals.getMinOutputTimestampMs());
-      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, 
combinedWatermarkHold);
+  /**
+   * Allows to apply a hold to the output watermark before it is send out. By 
default, just passes
+   * the potential output watermark through which will make it the new output 
watermark.
+   *
+   * @param currentOutputWatermark the current output watermark
+   * @param potentialOutputWatermark The potential new output watermark which 
can be adjusted, if
+   *     needed. The input watermark hold has already been applied.
+   * @return The new output watermark which will be emitted.
+   */
+  public long applyOutputWatermarkHold(long currentOutputWatermark, long 
potentialOutputWatermark) {
+    return potentialOutputWatermark;
+  }
 
-      if (potentialOutputWatermark > currentOutputWatermark) {
-        setCurrentOutputWatermark(potentialOutputWatermark);
-        emitWatermark(currentOutputWatermark);
-      }
+  private long computeOutputWatermark(long inputWatermarkHold) {
+    final long potentialOutputWatermark;
+    if (keyCoder == null) {
+      potentialOutputWatermark = inputWatermarkHold;
+    } else {
+      Instant watermarkHold = keyedStateInternals.watermarkHold();
+      long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), 
inputWatermarkHold);
+      potentialOutputWatermark =
+          Math.min(combinedWatermarkHold, 
timerInternals.getMinOutputTimestampMs());
     }
+    return potentialOutputWatermark;
   }
 
-  private void emitWatermark(long watermark) {
-    // Must invoke finishBatch before emit the +Inf watermark otherwise there 
are some late events.
-    if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
-      invokeFinishBundle();
+  private void maybeEmitWatermark(long watermark) {
+    if (watermark > currentOutputWatermark) {
+      // Must invoke finishBatch before emit the +Inf watermark otherwise 
there are some late
+      // events.
+      if (watermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+        invokeFinishBundle();
+      }
+      LOG.debug("Emitting watermark {}", watermark);
+      currentOutputWatermark = watermark;
+      output.emitWatermark(new Watermark(watermark));
     }
-    output.emitWatermark(new Watermark(watermark));
   }
 
   @Override
-  public void processWatermark2(Watermark mark) throws Exception {
-
-    setCurrentSideInputWatermark(mark.getTimestamp());
+  public final void processWatermark2(Watermark mark) throws Exception {
+    currentSideInputWatermark = mark.getTimestamp();

Review comment:
       I agree, that is something we should address.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 426653)
    Time Spent: 9h 40m  (was: 9.5h)

> ImpulseSourceFunction does not emit a final watermark
> -----------------------------------------------------
>
>                 Key: BEAM-9733
>                 URL: https://issues.apache.org/jira/browse/BEAM-9733
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Critical
>             Fix For: 2.21.0
>
>          Time Spent: 9h 40m
>  Remaining Estimate: 0h
>
> The Flink Runner's {{ImpulseSourceFunction}} does not emit a final watermark, 
> unless {{--shutdownSourcesOnFinalWatermark}} flag has been specified (the 
> flag is used in tests to shutdown the pipeline after reading all data). Most 
> pipelines will be long-running and thus do not specify the flag. 
> Not sending out the final watermark causes GroupByKey to hold back the data 
> of event time windows until the pipeline is shut down (the final watermark is 
> always emitted on pipeline shutdown which is why using the above flag works).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to