lukecwik commented on code in PR #22889:
URL: https://github.com/apache/beam/pull/22889#discussion_r1002216474


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:
##########
@@ -760,15 +769,18 @@ public long applyInputWatermarkHold(long inputWatermark) {
   }
 
   /**
-   * Allows to apply a hold to the output watermark before it is send out. By 
default, just passes
-   * the potential output watermark through which will make it the new output 
watermark.
+   * Allows to apply a hold to the output watermark before it is send out. 
Used to apply hold on

Review Comment:
   ```suggestion
      * Allows to apply a hold to the output watermark before it is sent out. 
Used to apply hold on
   ```



##########
runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java:
##########
@@ -38,10 +38,15 @@
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.Impulse;
+import org.apache.beam.sdk.transforms.MapElements;

Review Comment:
   Can we instead run the RequiresStableInputIT instead?
   
   
https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/sdks/java/core/src/test/java/org/apache/beam/sdk/RequiresStableInputIT.java



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:
##########
@@ -326,6 +324,17 @@ public DoFnOperator(
     this.finishBundleBeforeCheckpointing = 
flinkOptions.getFinishBundleBeforeCheckpointing();
   }
 
+  private boolean isRequiresStableInput(DoFn<InputT, OutputT> doFn) {
+    // WindowDoFnOperator does not use a DoFn
+    return doFn != null

Review Comment:
   This annotation can be added to other parts of the DoFn like onTimer as per: 
https://github.com/apache/beam/blob/8dd87491dcb2660f859f87b3336adb984c62c12d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L803
   
   Is `@ProcessElement` the only place we are trying to add support for it?
   
   If you copied this code from 
https://github.com/apache/beam/blob/434427e90b55027c5944fa73de68bff4f9a4e8fe/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java#L360,
 then that code is wrong based upon the specification.



##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java:
##########
@@ -85,6 +86,8 @@ public static <InputT, OutputT> BufferingDoFnRunner<InputT, 
OutputT> create(
   int currentStateIndex;
   /** The current handler used for buffering. */
   private BufferingElementsHandler currentBufferingElementsHandler;
+  /** Minimum timestamp of all buffered elements. */
+  private volatile long currentOutputWatermarkHold;

Review Comment:
   Should we make this optional so that we can differentiate in the case where 
the are no elements to prevent the watermark from advancing to Long.MAX_VALUE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to