scwhittle commented on code in PR #35021:
URL: https://github.com/apache/beam/pull/35021#discussion_r2166715711


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/Holder.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+/**
+ * A trivial boxing of a value, used when nullability needs to be added to a 
generic type. (Optional
+ * does not work for this)
+ *
+ * <p>Example: For a generic type `T` the actual parameter may be nullable or 
not. So you cannot
+ * check values for null to determine presence/absence. Instead you can store 
a {@code @Nullable
+ * Holder<T>}.
+ */
+public class Holder<T> {

Review Comment:
   mark Internal?



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java:
##########
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.fn.harness.state.FnApiStateAccessor;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
+import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnInvoker.DelegatingArgumentProvider;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.TruncateResult;
+import 
org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.ParDoTranslation;
+import org.apache.beam.sdk.util.construction.RehydratedComponents;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A runner for the PTransform that truncates sized restrictions, for the case 
of draining a
+ * pipeline.
+ *
+ * <p>The input and output types for this transform are
+ * <li>{@code WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>}
+ */
+public class SplittableTruncateSizedRestrictionsDoFnRunner<
+        InputT, RestrictionT extends @NonNull Object, PositionT, 
WatermarkEstimatorStateT, OutputT>
+    implements FnApiStateAccessor.MutatingStateContext<Void, BoundedWindow> {
+
+  /** A registrar which provides a factory to handle Java {@link DoFn}s. */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements PTransformRunnerFactory.Registrar {
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() 
{
+      Factory factory = new Factory();
+      return ImmutableMap.<String, PTransformRunnerFactory>builder()
+          
.put(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN, factory)
+          .build();
+    }
+  }
+
+  static class Factory implements PTransformRunnerFactory {
+    @Override
+    public final void addRunnerForPTransform(Context context) throws 
IOException {
+      addRunnerForTruncateSizedRestrictions(context);
+    }
+
+    private <
+            InputT,
+            RestrictionT extends @NonNull Object,
+            PositionT,
+            WatermarkEstimatorStateT,
+            OutputT>
+        void addRunnerForTruncateSizedRestrictions(Context context) throws 
IOException {
+
+      FnApiStateAccessor<Void> stateAccessor =
+          
FnApiStateAccessor.Factory.<Void>factoryForPTransformContext(context).create();
+
+      // Main output
+      checkArgument(
+          context.getPTransform().getOutputsMap().size() == 1,
+          "TruncateSizedRestrictions expects exact one output, but got: ",
+          context.getPTransform().getOutputsMap().size());
+      TupleTag<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> mainOutputTag =
+          new TupleTag<>(
+              
Iterables.getOnlyElement(context.getPTransform().getOutputsMap().keySet()));
+
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainOutputConsumer =
+              context.getPCollectionConsumer(
+                  
context.getPTransform().getOutputsOrThrow(mainOutputTag.getId()));
+
+      SplittableTruncateSizedRestrictionsDoFnRunner<
+              InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, 
OutputT>
+          runner =
+              new SplittableTruncateSizedRestrictionsDoFnRunner<>(
+                  context.getPipelineOptions(),
+                  context.getPTransformId(),
+                  context.getPTransform(),
+                  context.getComponents(),
+                  mainOutputConsumer,
+                  stateAccessor);
+
+      // Register input consumer that delegates splitting
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainInputConsumer;
+
+      if (mainOutputConsumer instanceof HandlesSplits) {
+        mainInputConsumer =
+            new SplitDelegatingFnDataReceiver<>(runner, (HandlesSplits) 
mainOutputConsumer);
+      } else {
+        mainInputConsumer = runner::processElement;
+      }
+
+      context.addPCollectionConsumer(
+          context
+              .getPTransform()
+              
.getInputsOrThrow(ParDoTranslation.getMainInputName(context.getPTransform())),
+          mainInputConsumer);
+      context.addTearDownFunction(runner::tearDown);
+    }
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private final boolean observesWindow;
+  private final PipelineOptions pipelineOptions;
+
+  private final DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  private final FnDataReceiver<
+          WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+      mainOutputConsumer;
+
+  private final FnApiStateAccessor<?> stateAccessor;
+
+  private final TruncateSizedRestrictionArgumentProvider 
mutableArgumentProvider;
+
+  private final Coder<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>
+      inputCoder;
+  private final String pTransformId;
+  private final RunnerApi.PTransform pTransform;
+  private final Coder<BoundedWindow> windowCoder;
+  private final String mainInputId;
+
+  /**
+   * Used to guarantee a consistent view of this {@link
+   * SplittableTruncateSizedRestrictionsDoFnRunner} while setting up for {@link
+   * DoFnInvoker#invokeProcessElement} since {@link #trySplit} may access 
internal {@link
+   * SplittableTruncateSizedRestrictionsDoFnRunner} state concurrently.
+   */
+  private final Object splitLock = new Object();
+
+  private final DoFnSchemaInformation doFnSchemaInformation;
+  private final Map<String, PCollectionView<?>> sideInputMapping;
+
+  ///
+  // Mutating fields that change with the element and window being processed
+  //
+  private int windowCurrentIndex;
+  private @Nullable List<BoundedWindow> currentWindows;
+  private @Nullable RestrictionT currentRestriction;
+  private @Nullable WatermarkEstimatorStateT currentWatermarkEstimatorState;
+  private @Nullable Instant initialWatermark;
+  private WatermarkEstimators.@Nullable 
WatermarkAndStateObserver<WatermarkEstimatorStateT>
+      currentWatermarkEstimator;
+  private @Nullable BoundedWindow currentWindow;
+  private @Nullable RestrictionTracker<RestrictionT, PositionT> currentTracker;
+  private @Nullable WindowedValue<InputT> currentElement;
+  /**
+   * The window index at which processing should stop. The window with this 
index should not be
+   * processed.
+   */
+  private int windowStopIndex;
+  /**
+   * The window index which is currently being processed. This should always 
be less than
+   * windowStopIndex.
+   */
+  SplittableTruncateSizedRestrictionsDoFnRunner(
+      PipelineOptions pipelineOptions,
+      String pTransformId,
+      PTransform pTransform,
+      RunnerApi.Components components,
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainOutputConsumer,
+      FnApiStateAccessor<Void> stateAccessor)
+      throws IOException {
+    this.pipelineOptions = pipelineOptions;
+    this.stateAccessor = stateAccessor;
+    this.pTransformId = pTransformId;
+    this.pTransform = pTransform;
+
+    ParDoPayload parDoPayload = 
ParDoPayload.parseFrom(pTransform.getSpec().getPayload());
+
+    // DoFn and metadata
+    DoFn<InputT, OutputT> doFn = (DoFn<InputT, OutputT>) 
ParDoTranslation.getDoFn(parDoPayload);
+    DoFnSignature doFnSignature = DoFnSignatures.signatureForDoFn(doFn);
+    this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions);
+    this.doFnSchemaInformation = 
ParDoTranslation.getSchemaInformation(parDoPayload);
+
+    this.mainOutputConsumer = mainOutputConsumer;
+
+    // Side inputs
+    this.sideInputMapping = ParDoTranslation.getSideInputMapping(parDoPayload);
+
+    // Register processing methods
+    this.observesWindow =
+        (doFnSignature.splitRestriction() != null
+                && doFnSignature.splitRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty();
+
+    if (observesWindow) {
+      this.mutableArgumentProvider = new 
TruncateSizedRestrictionWindowObservingArgumentProvider();
+    } else {
+      this.mutableArgumentProvider =
+          new TruncateSizedRestrictionNonWindowObservingArgumentProvider();
+    }
+
+    // Main Input
+    this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
+    RunnerApi.PCollection mainInput =
+        
components.getPcollectionsOrThrow(pTransform.getInputsOrThrow(mainInputId));
+    RehydratedComponents rehydratedComponents =
+        
RehydratedComponents.forComponents(components).withPipeline(Pipeline.create());
+    this.inputCoder =
+        (Coder<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>>)
+            rehydratedComponents.getCoder(mainInput.getCoderId());
+    this.windowCoder =
+        (Coder<BoundedWindow>)
+            rehydratedComponents
+                .getWindowingStrategy(mainInput.getWindowingStrategyId())
+                .getWindowFn()
+                .windowCoder();
+  }
+
+  @Override
+  public Void getCurrentKey() {
+    return null;
+  }
+
+  @Override
+  public BoundedWindow getCurrentWindow() {
+    return checkStateNotNull(
+        currentWindow, "Attempt to access window outside windowed element 
processing context.");
+  }
+
+  public List<BoundedWindow> getCurrentWindows() {
+    return checkStateNotNull(
+        currentWindows,
+        "Attempt to access window collection outside windowed element 
processing context.");
+  }
+
+  public WindowedValue<InputT> getCurrentElement() {
+    return checkStateNotNull(
+        currentElement, "Attempt to access element outside element processing 
context.");
+  }
+
+  private RestrictionT getCurrentRestriction() {
+    return checkStateNotNull(
+        this.currentRestriction,
+        "Attempt to access restriction outside element processing context.");
+  }
+
+  private RestrictionTracker<RestrictionT, ?> getCurrentTracker() {
+    return checkStateNotNull(
+        this.currentTracker,
+        "Attempt to access restriction tracker state outside element 
processing context.");
+  }
+
+  // Because WatermarkEstimatorStateT may allow nulls, we cannot use 
checkStateNotNull to ensure we
+  // are in an
+  // element processing context.
+  //
+  // But because it may _not_ accept nulls, we cannot safely return 
currentWatermarkEstimatorState
+  // without
+  // checking for null.
+  //
+  // There is no solution for the type of currentWatermarkEstimatorState; we 
would have to introduce
+  // a "MutableOptional"
+  // to hold the present-or-absent value. Ultimately, the root cause is the 
antipattern of a class
+  // where fields
+  // are mutated to non-null then back to null, creating sensitive state 
machine invariants between
+  // methods.
+  @SuppressWarnings("nullness")
+  private WatermarkEstimatorStateT getCurrentWatermarkEstimatorState() {
+    checkStateNotNull(
+        this.currentElement,
+        "Attempt to access watermark estimator state outside element 
processing context.");
+    return this.currentWatermarkEstimatorState;
+  }
+
+  void processElement(
+      WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
+    if (observesWindow) {
+      processElementForWindowObservingTruncateRestriction(elem);
+    } else {
+      processElementForTruncateRestriction(elem);
+    }
+  }
+
+  HandlesSplits.@Nullable SplitResult trySplit(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    if (observesWindow) {
+      return 
trySplitForWindowObservingTruncateRestriction(fractionOfRemainder, 
splitDelegate);
+    } else {
+      return splitDelegate.trySplit(fractionOfRemainder);
+    }
+  }
+
+  private void processElementForTruncateRestriction(
+      WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
+    currentElement = elem.withValue(elem.getValue().getKey().getKey());
+    currentRestriction = elem.getValue().getKey().getValue().getKey();
+    currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
+    currentTracker =
+        RestrictionTrackers.observe(
+            doFnInvoker.invokeNewTracker(mutableArgumentProvider),
+            new ClaimObserver<PositionT>() {
+              @Override
+              public void onClaimed(PositionT position) {}
+
+              @Override
+              public void onClaimFailed(PositionT position) {}
+            });
+    try {
+      TruncateResult<RestrictionT> truncatedRestriction =
+          doFnInvoker.invokeTruncateRestriction(mutableArgumentProvider);
+      if (truncatedRestriction != null) {
+        
mutableArgumentProvider.output(truncatedRestriction.getTruncatedRestriction());
+      }
+    } finally {
+      currentTracker = null;
+      currentElement = null;
+      currentRestriction = null;
+      currentWatermarkEstimatorState = null;
+    }
+
+    this.stateAccessor.finalizeState();
+  }
+
+  private void processElementForWindowObservingTruncateRestriction(
+      WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> elem) {
+    currentElement = elem.withValue(elem.getValue().getKey().getKey());
+    windowCurrentIndex = -1;
+    windowStopIndex = elem.getWindows().size();
+    currentWindows = ImmutableList.copyOf(elem.getWindows());
+    while (true) {
+      synchronized (splitLock) {
+        windowCurrentIndex++;
+        if (windowCurrentIndex >= windowStopIndex) {
+          // Careful to reset the split state under the same synchronized 
block.
+          windowCurrentIndex = -1;
+          windowStopIndex = 0;
+          currentElement = null;
+          currentWindows = null;
+          currentRestriction = null;
+          currentWatermarkEstimatorState = null;
+          currentWindow = null;
+          currentTracker = null;
+          currentWatermarkEstimator = null;
+          initialWatermark = null;
+          break;
+        }
+        currentRestriction = elem.getValue().getKey().getValue().getKey();
+        currentWatermarkEstimatorState = 
elem.getValue().getKey().getValue().getValue();
+        currentWindow =
+            checkStateNotNull(
+                    currentWindows,
+                    "internal error: currentWindows is null during element 
processing")
+                .get(windowCurrentIndex);
+        currentTracker =
+            RestrictionTrackers.observe(
+                doFnInvoker.invokeNewTracker(mutableArgumentProvider),
+                new ClaimObserver<PositionT>() {
+                  @Override
+                  public void onClaimed(PositionT position) {}
+
+                  @Override
+                  public void onClaimFailed(PositionT position) {}
+                });
+        currentWatermarkEstimator =
+            WatermarkEstimators.threadSafe(
+                
doFnInvoker.invokeNewWatermarkEstimator(mutableArgumentProvider));
+        initialWatermark = 
currentWatermarkEstimator.getWatermarkAndState().getKey();
+      }
+      TruncateResult<RestrictionT> truncatedRestriction =
+          doFnInvoker.invokeTruncateRestriction(mutableArgumentProvider);
+      if (truncatedRestriction != null) {
+        
mutableArgumentProvider.output(truncatedRestriction.getTruncatedRestriction());
+      }
+    }
+    this.stateAccessor.finalizeState();
+  }
+
+  private @Nullable Progress getProgressFromWindowObservingTruncate(double 
elementCompleted) {
+    synchronized (splitLock) {
+      if (currentWindow != null) {
+        return scaleProgress(
+            Progress.from(elementCompleted, 1 - elementCompleted),
+            windowCurrentIndex,
+            windowStopIndex);
+      }
+    }
+    return null;
+  }
+
+  @VisibleForTesting
+  static Progress scaleProgress(Progress progress, int currentWindowIndex, int 
stopWindowIndex) {
+    checkArgument(
+        currentWindowIndex < stopWindowIndex,
+        "Current window index (%s) must be less than stop window index (%s)",
+        currentWindowIndex,
+        stopWindowIndex);
+
+    double totalWorkPerWindow = progress.getWorkCompleted() + 
progress.getWorkRemaining();
+    double completed = totalWorkPerWindow * currentWindowIndex + 
progress.getWorkCompleted();
+    double remaining =
+        totalWorkPerWindow * (stopWindowIndex - currentWindowIndex - 1)
+            + progress.getWorkRemaining();
+    return Progress.from(completed, remaining);
+  }
+
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(
+                    mutableArgumentProvider, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return getCurrentRestriction();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(
+                    mutableArgumentProvider, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    WindowedValue<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>>
+                        splitRoot =
+                            (WindowedValue<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>>)
+                                splitResult.getPrimarySplitRoot();
+
+                    return splitRoot.getValue().getValue().getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(
+                    mutableArgumentProvider, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    WindowedValue<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>>
+                        splitRoot =
+                            (WindowedValue<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>>)
+                                splitResult.getResidualSplitRoot();
+
+                    return splitRoot.getValue().getValue().getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValues.of(
+                
KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                
splitResult.getPrimaryInFullyProcessedWindowsRoot().getPaneInfo()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValues.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), 
primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPaneInfo()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValues.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), 
residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPaneInfo()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValues.of(
+                
KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                
splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                
splitResult.getResidualInUnprocessedWindowsRoot().getPaneInfo()));
+  }
+
+  private HandlesSplits.@Nullable SplitResult 
trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedSplitResult windowedSplitResult;
+    HandlesSplits.SplitResult downstreamSplitResult;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindow == null) {
+        return null;
+      }
+
+      SplitResultsWithStopIndex splitResult =
+          computeSplitForTruncate(
+              getCurrentElement(),
+              getCurrentRestriction(),
+              getCurrentWindow(),
+              getCurrentWindows(),
+              getCurrentWatermarkEstimatorState(),
+              fractionOfRemainder,
+              splitDelegate,
+              windowCurrentIndex,
+              windowStopIndex);
+      if (splitResult == null) {
+        return null;
+      }
+      windowStopIndex = splitResult.getNewWindowStopIndex();
+      windowedSplitResult =
+          calculateRestrictionSize(
+              splitResult.getWindowSplit(),
+              PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN 
+ "/GetSize");
+      downstreamSplitResult = splitResult.getDownstreamSplit();
+    }
+    // Note that the assumption here is the fullInputCoder of the Truncate 
transform should be the
+    // the same as the SDF/Process transform.
+    Coder<WindowedValue<?>> fullInputCoder =
+        (Coder<WindowedValue<?>>) (Object) 
WindowedValues.getFullCoder(inputCoder, windowCoder);

Review Comment:
   I was wondering if this cast could ever fail, and if it could should we be 
creating the fullInputCoder during the construction of this class so we fail 
earlier during setup, since I believe we have inputCoder and windowCoder 
available then.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/ProgressUtils.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+
+/** Miscellaneous methods for working with progress. */
+public abstract class ProgressUtils {

Review Comment:
   Internal?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to