lukecwik commented on a change in pull request #12016:
URL: https://github.com/apache/beam/pull/12016#discussion_r456038848



##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1053,6 +1057,59 @@ public Duration getAllowedTimestampSkew() {
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface SplitRestriction {}
 
+  /**
+   * Annotation for the method that truncates the restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn} 
into a bounded one.
+   * This method is invoked when a pipeline is being <a
+   * 
href="https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#";>drained</a>.
+   *
+   * <p>This method is used to perform truncation of the restriction while it 
is not actively being
+   * processed.
+   *
+   * <p>Signature: {@code TruncateResult<RestrictionT> 
truncateRestriction(<arguments>);}
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>If one of the arguments is of type {@link OutputReceiver}, then it 
will be passed an
+   *       output receiver for outputting the truncated restrictions. All 
truncated restrictions
+   *       must be output through this parameter.

Review comment:
       ```suggestion
   ```

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -69,6 +69,29 @@ public static TransformReplacement createSizedReplacement() {
     return SizedReplacement.INSTANCE;
   }
 
+  /**
+   * Returns a transform replacement in drain mode which expands a splittable 
ParDo from:
+   *
+   * <pre>{@code
+   * sideInputA ---------\
+   * sideInputB ---------V
+   * mainInput ---> SplittableParDo --> outputA
+   *                                \-> outputB
+   * }</pre>
+   *
+   * into:
+   *
+   * <pre>{@code
+   * sideInputA 
---------\---------------------\----------------------\--------------------------\
+   * sideInputB 
---------V---------------------V----------------------V--------------------------V
+   * mainInput ---> PairWithRestriction --> SplitAndSize --> TruncateAndSize 
--> ProcessSizedElementsAndRestriction --> outputA
+   *                                                                           
                                    \-> outputB

Review comment:
       ```suggestion
      *                                                                         
                                       \-> outputB
   ```

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1053,6 +1057,59 @@ public Duration getAllowedTimestampSkew() {
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface SplitRestriction {}
 
+  /**
+   * Annotation for the method that truncates the restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn} 
into a bounded one.
+   * This method is invoked when a pipeline is being <a
+   * 
href="https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#";>drained</a>.
+   *
+   * <p>This method is used to perform truncation of the restriction while it 
is not actively being
+   * processed.
+   *
+   * <p>Signature: {@code TruncateResult<RestrictionT> 
truncateRestriction(<arguments>);}
+   *
+   * <p>This method must satisfy the following constraints:
+   *
+   * <ul>
+   *   <li>If one of the arguments is of type {@link OutputReceiver}, then it 
will be passed an
+   *       output receiver for outputting the truncated restrictions. All 
truncated restrictions
+   *       must be output through this parameter.
+   *   <li>If one of its arguments is tagged with the {@link Element} 
annotation, then it will be
+   *       passed the current element being processed; the argument must be of 
type {@code InputT}.
+   *       Note that automatic conversion of {@link Row}s and {@link 
FieldAccess} parameters are
+   *       currently unsupported.
+   *   <li>If one of its arguments is tagged with the {@link Restriction} 
annotation, then it will
+   *       be passed the current restriction being processed; the argument 
must be of type {@code
+   *       RestrictionT}.
+   *   <li>If one of its arguments is tagged with the {@link Timestamp} 
annotation, then it will be
+   *       passed the timestamp of the current element being processed; the 
argument must be of type
+   *       {@link Instant}.
+   *   <li>If one of its arguments is a {@link RestrictionTracker}, then it 
will be passed a tracker
+   *       that is initialized for the current {@link Restriction}. The 
argument must be of the
+   *       exact type {@code RestrictionTracker<RestrictionT, PositionT>}.
+   *   <li>If one of its arguments is a subtype of {@link BoundedWindow}, then 
it will be passed the
+   *       window of the current element. When applied by {@link ParDo} the 
subtype of {@link
+   *       BoundedWindow} must match the type of windows on the input {@link 
PCollection}. If the
+   *       window is not accessed a runner may perform additional 
optimizations.
+   *   <li>If one of its arguments is of type {@link PaneInfo}, then it will 
be passed information
+   *       about the current triggering pane.
+   *   <li>If one of the parameters is of type {@link PipelineOptions}, then 
it will be passed the
+   *       options for the current pipeline.
+   * </ul>
+   *

Review comment:
       ```suggestion
      *
      * <p>Returns a truncated restriction representing a bounded amount of 
work that must be processed before the pipeline can be drained or {@code null} 
if no work is necessary.
      *
   ```

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,90 +493,70 @@
             || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
             || !sideInputMapping.isEmpty()) {
           mainInputConsumer = 
this::processElementForWindowObservingSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new WindowObservingProcessBundleContext() {
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+
+        } else {
+          mainInputConsumer = this::processElementForSplitRestriction;
+          this.processContext =
+              new SizedRestrictionNonWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateRestriction() != null
+                && doFnSignature.truncateRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                private final HandlesSplits splitDelegate =

Review comment:
       This logic assumes that there will always be one output and it must 
support splitting. It is valid for a runner to insert gRPC ports between 
truncate and process sized restriction (and some may choose to do so).
   
   It would be best if we didn't assume the downstream consumer is splittable 
and that there is only one of them and instead check that these conditions are 
met within trySplit/getProgress. This is what BeamFnDataReadRunner does.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,90 +493,70 @@
             || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
             || !sideInputMapping.isEmpty()) {
           mainInputConsumer = 
this::processElementForWindowObservingSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new WindowObservingProcessBundleContext() {
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+
+        } else {
+          mainInputConsumer = this::processElementForSplitRestriction;
+          this.processContext =
+              new SizedRestrictionNonWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateRestriction() != null
+                && doFnSignature.truncateRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                private final HandlesSplits splitDelegate =
+                    (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
+
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForWindowObservingTruncateRestriction(input);
+                }
+
+                @Override
+                public SplitResult trySplit(double fractionOfRemainder) {
+                  return splitDelegate.trySplit(fractionOfRemainder);
+                }
+
                 @Override
-                public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> 
doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> 
restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, 
currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentWindow,
-                              currentElement.getPane()));
+                public double getProgress() {
+                  return splitDelegate.getProgress();
                 }
               };
-        } else {
-          mainInputConsumer = this::processElementForSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new NonWindowObservingProcessBundleContext() {
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN);
+        } else {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                private final HandlesSplits splitDelegate =
+                    (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
+
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForTruncateRestriction(input);
+                }
+
                 @Override
-                public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> 
doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> 
restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, 
currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentElement.getWindows(),
-                              currentElement.getPane()));
+                public SplitResult trySplit(double fractionOfRemainder) {
+                  return splitDelegate.trySplit(fractionOfRemainder);

Review comment:
       We should return null here and leave a todo BEAM-10303 for window 
observing optimization.
   
   Because of window exploding, we'll only split on the current window which 
will return the wrong primary/residual since it won't take into account these 
other windows that have been fully processed or yet to be started.

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -273,4 +296,211 @@ private static String generateUniqueId(String prefix, 
Predicate<String> isExisti
     }
     return prefix + i;
   }
+
+  /** See {@link #createTruncateReplacement()} ()} for details. */
+  private static class TruncateReplacement extends SizedReplacement {

Review comment:
       ```suggestion
     /** See {@link #createTruncateReplacement} for details. */
     private static class TruncateReplacement implements TransformReplacement {
   ```
   
   Do we get some value from extending `SizedReplacement`?

##########
File path: 
runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/graph/SplittableParDoExpander.java
##########
@@ -273,4 +296,211 @@ private static String generateUniqueId(String prefix, 
Predicate<String> isExisti
     }
     return prefix + i;
   }
+
+  /** See {@link #createTruncateReplacement()} ()} for details. */
+  private static class TruncateReplacement extends SizedReplacement {
+    private static final TruncateReplacement INSTANCE = new 
TruncateReplacement();
+
+    @Override
+    public MessageWithComponents getReplacement(
+        String transformId, ComponentsOrBuilder existingComponents) {
+      try {
+        MessageWithComponents.Builder rval = 
MessageWithComponents.newBuilder();
+
+        PTransform splittableParDo = 
existingComponents.getTransformsOrThrow(transformId);
+        ParDoPayload payload = 
ParDoPayload.parseFrom(splittableParDo.getSpec().getPayload());
+        // Only perform the expansion if this is a splittable DoFn.
+        if (payload.getRestrictionCoderId() == null || 
payload.getRestrictionCoderId().isEmpty()) {
+          return null;
+        }
+
+        String mainInputName = 
ParDoTranslation.getMainInputName(splittableParDo);

Review comment:
       We should be able to refactor this into a single private static shared 
method between SizedReplacement that takes a `boolean truncate`.
   
   Lets do this in a follow-up PR.

##########
File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
##########
@@ -1053,6 +1057,59 @@ public Duration getAllowedTimestampSkew() {
   @Experimental(Kind.SPLITTABLE_DO_FN)
   public @interface SplitRestriction {}
 
+  /**
+   * Annotation for the method that truncates the restriction of a <a
+   * href="https://s.apache.org/splittable-do-fn";>splittable</a> {@link DoFn} 
into a bounded one.
+   * This method is invoked when a pipeline is being <a
+   * 
href="https://docs.google.com/document/d/1NExwHlj-2q2WUGhSO4jTu8XGhDPmm3cllSN8IMmWci8/edit#";>drained</a>.
+   *
+   * <p>This method is used to perform truncation of the restriction while it 
is not actively being
+   * processed.
+   *
+   * <p>Signature: {@code TruncateResult<RestrictionT> 
truncateRestriction(<arguments>);}

Review comment:
       ```suggestion
      * <p>Signature: {@code @Nullable TruncateResult<RestrictionT> 
truncateRestriction(<arguments>);}
   ```

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
##########
@@ -99,6 +99,20 @@
    */
   public abstract void checkDone() throws IllegalStateException;
 
+  public enum IsBounded {
+    /** Indicates that a {@code Restriction} represents a bounded amount of 
work. */
+    BOUNDED,
+    /** Indicates that a {@code Restriction} represents an unbounded amount of 
work. */
+    UNBOUNDED
+  }
+
+  /**
+   * Return the boundedness of the current restriction. If the current 
restriction represents a
+   * finite amount of work, it should return {@link IsBounded#BOUNDED}. 
Otherwise, it should return
+   * {@link IsBounded#UNBOUNDED}.

Review comment:
       ```suggestion
      * {@link IsBounded#UNBOUNDED}.
      *
      * <p>It is valid to return {@link IsBounded#BOUNDED} after returning 
{@link IsBounded#UNBOUNDED} once
      * the end of a restriction is discovered. It is not valid to return 
{@link IsBounded#UNBOUNDED} after returning {@link IsBounded#BOUNDED}.
   ```

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/GrowableOffsetRangeTracker.java
##########
@@ -140,4 +140,15 @@ public Progress getProgress() {
         totalWork.subtract(workRemaining, 
MathContext.DECIMAL128).doubleValue(),
         workRemaining.doubleValue());
   }
+
+  @Override
+  public RestrictionBoundness isBounded() {
+    // If current range has been done, the range should be bounded.
+    if (lastAttemptedOffset != null && lastAttemptedOffset == Long.MAX_VALUE) {

Review comment:
       I see.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,90 +493,70 @@
             || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
             || !sideInputMapping.isEmpty()) {
           mainInputConsumer = 
this::processElementForWindowObservingSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new WindowObservingProcessBundleContext() {
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+
+        } else {
+          mainInputConsumer = this::processElementForSplitRestriction;
+          this.processContext =
+              new SizedRestrictionNonWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateRestriction() != null
+                && doFnSignature.truncateRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                private final HandlesSplits splitDelegate =
+                    (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
+
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForWindowObservingTruncateRestriction(input);
+                }
+
+                @Override
+                public SplitResult trySplit(double fractionOfRemainder) {
+                  return splitDelegate.trySplit(fractionOfRemainder);
+                }
+
                 @Override
-                public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> 
doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> 
restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, 
currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentWindow,
-                              currentElement.getPane()));
+                public double getProgress() {
+                  return splitDelegate.getProgress();
                 }
               };
-        } else {
-          mainInputConsumer = this::processElementForSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new NonWindowObservingProcessBundleContext() {
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN);
+        } else {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                private final HandlesSplits splitDelegate =
+                    (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
       Ditto for not assuming for the same reasons mentioned above.

##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -2892,4 +2899,492 @@ public void 
testProcessElementForWindowedSplitAndSizeRestriction() throws Except
     Iterables.getOnlyElement(teardownFunctions).run();
     assertThat(mainOutputValues, empty());
   }
+
+  private static SplitResult createSplitResult(double fractionOfRemainder) {
+    ByteString.Output primaryBytes = ByteString.newOutput();
+    ByteString.Output residualBytes = ByteString.newOutput();
+    try {
+      DoubleCoder.of().encode(fractionOfRemainder, primaryBytes);
+      DoubleCoder.of().encode(1 - fractionOfRemainder, residualBytes);
+    } catch (Exception e) {
+      // No-op.
+    }
+    return SplitResult.of(
+        ImmutableList.of(
+            
BundleApplication.newBuilder().setElement(primaryBytes.toByteString()).build()),
+        ImmutableList.of(
+            DelayedBundleApplication.newBuilder()
+                .setApplication(
+                    
BundleApplication.newBuilder().setElement(residualBytes.toByteString()).build())
+                .build()));
+  }
+
+  private static class SplittableFnDataReceiver
+      implements HandlesSplits, FnDataReceiver<WindowedValue> {
+    SplittableFnDataReceiver(
+        List<WindowedValue<KV<KV<String, OffsetRange>, Double>>> 
mainOutputValues) {
+      this.mainOutputValues = mainOutputValues;
+    }
+
+    private final List<WindowedValue<KV<KV<String, OffsetRange>, Double>>> 
mainOutputValues;
+
+    @Override
+    public SplitResult trySplit(double fractionOfRemainder) {
+      return createSplitResult(fractionOfRemainder);
+    }
+
+    @Override
+    public double getProgress() {
+      return 0.7;
+    }
+
+    @Override
+    public void accept(WindowedValue input) throws Exception {
+      mainOutputValues.add(input);
+    }
+  }
+
+  @Test
+  public void 
testProcessElementForTruncateAndSizeRestrictionForwardSplitAndProgress()

Review comment:
       Please make this test the non window observing variant and add a window 
observing one which returns null instead of progress/split.

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -488,90 +493,70 @@
             || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
             || !sideInputMapping.isEmpty()) {
           mainInputConsumer = 
this::processElementForWindowObservingSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new WindowObservingProcessBundleContext() {
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+
+        } else {
+          mainInputConsumer = this::processElementForSplitRestriction;
+          this.processContext =
+              new SizedRestrictionNonWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN);
+        }
+        break;
+      case PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN:
+        if ((doFnSignature.truncateRestriction() != null
+                && doFnSignature.truncateRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty()) {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                private final HandlesSplits splitDelegate =
+                    (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
+
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForWindowObservingTruncateRestriction(input);
+                }
+
+                @Override
+                public SplitResult trySplit(double fractionOfRemainder) {
+                  return splitDelegate.trySplit(fractionOfRemainder);
+                }
+
                 @Override
-                public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> 
doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> 
restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, 
currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentWindow,
-                              currentElement.getPane()));
+                public double getProgress() {
+                  return splitDelegate.getProgress();
                 }
               };
-        } else {
-          mainInputConsumer = this::processElementForSplitRestriction;
-          // OutputT == RestrictionT
           this.processContext =
-              new NonWindowObservingProcessBundleContext() {
+              new SizedRestrictionWindowObservingProcessBundleContext(
+                  
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN);
+        } else {
+          mainInputConsumer =
+              new SplittableFnDataReceiver() {
+                private final HandlesSplits splitDelegate =
+                    (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
+
+                @Override
+                public void accept(WindowedValue input) throws Exception {
+                  processElementForTruncateRestriction(input);
+                }
+
                 @Override
-                public void outputWithTimestamp(OutputT output, Instant 
timestamp) {
-                  double size =
-                      doFnInvoker.invokeGetSize(
-                          new DelegatingArgumentProvider<InputT, OutputT>(
-                              this,
-                              
PTransformTranslation.SPLITTABLE_SPLIT_AND_SIZE_RESTRICTIONS_URN
-                                  + "/GetSize") {
-                            @Override
-                            public Object restriction() {
-                              return output;
-                            }
-
-                            @Override
-                            public Instant timestamp(DoFn<InputT, OutputT> 
doFn) {
-                              return timestamp;
-                            }
-
-                            @Override
-                            public RestrictionTracker<?, ?> 
restrictionTracker() {
-                              return doFnInvoker.invokeNewTracker(this);
-                            }
-                          });
-
-                  outputTo(
-                      mainOutputConsumers,
-                      (WindowedValue<OutputT>)
-                          WindowedValue.of(
-                              KV.of(
-                                  KV.of(
-                                      currentElement.getValue(),
-                                      KV.of(output, 
currentWatermarkEstimatorState)),
-                                  size),
-                              timestamp,
-                              currentElement.getWindows(),
-                              currentElement.getPane()));
+                public SplitResult trySplit(double fractionOfRemainder) {
+                  return splitDelegate.trySplit(fractionOfRemainder);
+                }
+
+                @Override
+                public double getProgress() {
+                  return splitDelegate.getProgress();

Review comment:
       return null here as well




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to