chamikaramj commented on code in PR #38346:
URL: https://github.com/apache/beam/pull/38346#discussion_r3198310496


##########
runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java:
##########
@@ -50,12 +51,27 @@ public Result(
         @Nullable RestrictionT residualRestriction,
         DoFn.ProcessContinuation continuation,
         @Nullable Instant futureOutputWatermark,
-        @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) {
+        @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState,
+        double backlogBytes) {
       checkArgument(continuation != null, "continuation must not be null");
       this.continuation = continuation;
       this.residualRestriction = residualRestriction;
       this.futureOutputWatermark = futureOutputWatermark;
       this.futureWatermarkEstimatorState = futureWatermarkEstimatorState;
+      this.backlogBytes = backlogBytes;
+    }
+
+    public Result(
+        @Nullable RestrictionT residualRestriction,
+        DoFn.ProcessContinuation continuation,
+        @Nullable Instant futureOutputWatermark,
+        @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) {
+      this(
+          residualRestriction,
+          continuation,
+          futureOutputWatermark,
+          futureWatermarkEstimatorState,
+          -1.0);

Review Comment:
   Probably move to a constant or add a comment.



##########
runners/core-java/src/test/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvokerTest.java:
##########
@@ -95,6 +95,30 @@ public OffsetRange 
getInitialRestriction(@SuppressWarnings("unused") @Element Vo
     }
   }
 
+  private static class GetSizeFn extends DoFn<Void, String> {
+    @ProcessElement
+    public ProcessContinuation process(
+        ProcessContext c, RestrictionTracker<OffsetRange, Long> tracker) {
+      for (long i = tracker.currentRestriction().getFrom(); 
tracker.tryClaim(i); ++i) {
+        c.output(String.valueOf(i));
+        if (i == 2) {
+          return resume();
+        }
+      }
+      return stop();
+    }
+
+    @GetInitialRestriction
+    public OffsetRange getInitialRestriction() {
+      return new OffsetRange(0, 10);
+    }
+
+    @GetSize

Review Comment:
   Seems like implementing GetSize is optional: 
https://github.com/apache/beam/blob/3a66beeec82b11972476a4418607bc2971b0e8b9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L1088
   
   Have we considered the case where this is not implemented ? Also, let's add 
a test for this.



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext 
finishBundleContext(DoFn<InputT, OutputT> doFn) {
     if (residual == null) {
       return new Result(null, cont, null, null);
     }
+    final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> 
residualForGetSize = residual;
+    // For a list of all DoFnInvoker arguments, see DoFn.java.
+    double backlogBytes =
+        invoker.invokeGetSize(
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public String getErrorContext() {
+                return 
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName()
+                    + "/GetSize";
+              }
+
+              @Override
+              public InputT element(DoFn<InputT, OutputT> doFn) {
+                return element.getValue();
+              }
+
+              @Override
+              public Object restriction() {
+                return residualForGetSize.getKey();
+              }
+
+              @Override
+              public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                return element.getTimestamp();
+              }
+
+              @Override
+              public RestrictionTracker<?, ?> restrictionTracker() {
+                return invoker.invokeNewTracker(
+                    new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+                      @Override
+                      public String getErrorContext() {
+                        return 
OutputAndTimeBoundedSplittableProcessElementInvoker.class
+                                .getSimpleName()
+                            + "/NewTracker";
+                      }
+
+                      @Override
+                      public InputT element(DoFn<InputT, OutputT> doFn) {
+                        return element.getValue();
+                      }
+
+                      @Override
+                      public Object restriction() {
+                        return residualForGetSize.getKey();
+                      }
+
+                      @Override
+                      public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                        return element.getTimestamp();
+                      }
+
+                      @Override
+                      public BoundedWindow window() {
+                        throw new IllegalStateException(
+                            "Attempting to access window outside of a windowed 
context");
+                      }
+
+                      @Override
+                      public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+                        throw new IllegalStateException(
+                            "Attempting to access PaneInfo outside of a 
windowed context");
+                      }
+
+                      @Override
+                      public PipelineOptions pipelineOptions() {
+                        return pipelineOptions;
+                      }
+                    });
+              }
+
+              @Override
+              public BoundedWindow window() {
+                throw new IllegalStateException(
+                    "Attempting to access window outside of a windowed 
context");
+              }
+
+              @Override
+              public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+                throw new IllegalStateException(
+                    "Attempting to access PaneInfo outside of a windowed 
context");
+              }
+
+              @Override
+              public PipelineOptions pipelineOptions() {
+                return pipelineOptions;
+              }
+
+              @Override
+              public Object sideInput(String tagId) {
+                PCollectionView<?> view = sideInputMapping.get(tagId);
+                if (view == null) {
+                  throw new IllegalArgumentException("calling getSideInput() 
with unknown view");
+                }
+                return processContext.sideInput(view);
+              }
+            });
     return new Result(
-        residual.getKey(), cont, residual.getValue().getKey(), 
residual.getValue().getValue());
+        residual.getKey(),
+        cont,
+        residual.getValue().getKey(),
+        residual.getValue().getValue(),
+        backlogBytes);

Review Comment:
   We expect to get this information only during finishBundle ?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext 
finishBundleContext(DoFn<InputT, OutputT> doFn) {
     if (residual == null) {
       return new Result(null, cont, null, null);
     }
+    final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> 
residualForGetSize = residual;
+    // For a list of all DoFnInvoker arguments, see DoFn.java.
+    double backlogBytes =
+        invoker.invokeGetSize(
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public String getErrorContext() {
+                return 
OutputAndTimeBoundedSplittableProcessElementInvoker.class.getSimpleName()
+                    + "/GetSize";
+              }
+
+              @Override
+              public InputT element(DoFn<InputT, OutputT> doFn) {
+                return element.getValue();
+              }
+
+              @Override
+              public Object restriction() {
+                return residualForGetSize.getKey();
+              }
+
+              @Override
+              public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                return element.getTimestamp();
+              }
+
+              @Override
+              public RestrictionTracker<?, ?> restrictionTracker() {
+                return invoker.invokeNewTracker(
+                    new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+                      @Override
+                      public String getErrorContext() {
+                        return 
OutputAndTimeBoundedSplittableProcessElementInvoker.class
+                                .getSimpleName()
+                            + "/NewTracker";
+                      }
+
+                      @Override
+                      public InputT element(DoFn<InputT, OutputT> doFn) {
+                        return element.getValue();
+                      }
+
+                      @Override
+                      public Object restriction() {
+                        return residualForGetSize.getKey();
+                      }
+
+                      @Override
+                      public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                        return element.getTimestamp();
+                      }
+
+                      @Override
+                      public BoundedWindow window() {
+                        throw new IllegalStateException(
+                            "Attempting to access window outside of a windowed 
context");
+                      }
+
+                      @Override
+                      public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+                        throw new IllegalStateException(
+                            "Attempting to access PaneInfo outside of a 
windowed context");
+                      }
+
+                      @Override
+                      public PipelineOptions pipelineOptions() {
+                        return pipelineOptions;
+                      }
+                    });
+              }
+
+              @Override
+              public BoundedWindow window() {
+                throw new IllegalStateException(
+                    "Attempting to access window outside of a windowed 
context");
+              }
+
+              @Override
+              public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+                throw new IllegalStateException(
+                    "Attempting to access PaneInfo outside of a windowed 
context");
+              }
+
+              @Override
+              public PipelineOptions pipelineOptions() {
+                return pipelineOptions;
+              }
+
+              @Override
+              public Object sideInput(String tagId) {
+                PCollectionView<?> view = sideInputMapping.get(tagId);
+                if (view == null) {
+                  throw new IllegalArgumentException("calling getSideInput() 
with unknown view");
+                }
+                return processContext.sideInput(view);
+              }
+            });
     return new Result(
-        residual.getKey(), cont, residual.getValue().getKey(), 
residual.getValue().getValue());
+        residual.getKey(),
+        cont,
+        residual.getValue().getKey(),
+        residual.getValue().getValue(),
+        backlogBytes);

Review Comment:
   Should we do any form of validation of the value returned here before 
sending it to the runner ? For example, ignore if negative or zero(wrong 
implementation but we probably don't want to pass that to the runner).



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java:
##########
@@ -528,6 +529,11 @@ public Map<Long, Pair<Instant, Runnable>> flushState() {
           getWorkItem().getWorkToken(),
           activeReader);
       activeReader = null;
+    } else if (backlogBytes != UnboundedReader.BACKLOG_UNKNOWN && backlogBytes 
!= 1L) {

Review Comment:
   Good point :)



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java:
##########
@@ -278,8 +278,109 @@ public FinishBundleContext 
finishBundleContext(DoFn<InputT, OutputT> doFn) {
     if (residual == null) {
       return new Result(null, cont, null, null);
     }
+    final KV<RestrictionT, KV<Instant, WatermarkEstimatorStateT>> 
residualForGetSize = residual;
+    // For a list of all DoFnInvoker arguments, see DoFn.java.
+    double backlogBytes =
+        invoker.invokeGetSize(

Review Comment:
   How about creating a util to get information from the residual instead of 
creating an inline class here. Probably also refactor other similar places if 
any.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to