kennknowles commented on code in PR #35021:
URL: https://github.com/apache/beam/pull/35021#discussion_r2159157147


##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java:
##########
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.fn.harness.state.FnApiStateAccessor;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
+import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnInvoker.DelegatingArgumentProvider;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.TruncateResult;
+import 
org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.ParDoTranslation;
+import org.apache.beam.sdk.util.construction.RehydratedComponents;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A runner for the PTransform that truncates sized restrictions, for the case 
of draining a
+ * pipeline.
+ *
+ * <p>The input and output types for this transform are
+ * <li>{@code WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>}
+ */
+public class SplittableTruncateSizedRestrictionsDoFnRunner<
+        InputT, RestrictionT extends @NonNull Object, PositionT, 
WatermarkEstimatorStateT, OutputT>
+    implements FnApiStateAccessor.MutatingStateContext<Void, BoundedWindow> {
+
+  /** A registrar which provides a factory to handle Java {@link DoFn}s. */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements PTransformRunnerFactory.Registrar {
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() 
{
+      Factory factory = new Factory();
+      return ImmutableMap.<String, PTransformRunnerFactory>builder()
+          
.put(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN, factory)
+          .build();
+    }
+  }
+
+  static class Factory implements PTransformRunnerFactory {
+    @Override
+    public final void addRunnerForPTransform(Context context) throws 
IOException {
+      addRunnerForTruncateSizedRestrictions(context);
+    }
+
+    private <
+            InputT,
+            RestrictionT extends @NonNull Object,
+            PositionT,
+            WatermarkEstimatorStateT,
+            OutputT>
+        void addRunnerForTruncateSizedRestrictions(Context context) throws 
IOException {
+
+      FnApiStateAccessor<Void> stateAccessor =
+          
FnApiStateAccessor.Factory.<Void>factoryForPTransformContext(context).create();
+
+      // Main output
+      checkArgument(
+          context.getPTransform().getOutputsMap().size() == 1,
+          "TruncateSizedRestrictions expects exact one output, but got: ",
+          context.getPTransform().getOutputsMap().size());
+      TupleTag<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> mainOutputTag =
+          new TupleTag<>(
+              
Iterables.getOnlyElement(context.getPTransform().getOutputsMap().keySet()));
+
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainOutputConsumer =
+              context.getPCollectionConsumer(
+                  
context.getPTransform().getOutputsOrThrow(mainOutputTag.getId()));
+
+      SplittableTruncateSizedRestrictionsDoFnRunner<
+              InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, 
OutputT>
+          runner =
+              new SplittableTruncateSizedRestrictionsDoFnRunner<>(
+                  context.getPipelineOptions(),
+                  context.getPTransformId(),
+                  context.getPTransform(),
+                  context.getComponents(),
+                  mainOutputConsumer,
+                  stateAccessor);
+
+      // Register input consumer that delegates splitting
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainInputConsumer;
+
+      if (mainOutputConsumer instanceof HandlesSplits) {
+        mainInputConsumer =
+            new SplitDelegatingFnDataReceiver<>(runner, (HandlesSplits) 
mainOutputConsumer);
+      } else {
+        mainInputConsumer = runner::processElement;
+      }
+
+      context.addPCollectionConsumer(
+          context
+              .getPTransform()
+              
.getInputsOrThrow(ParDoTranslation.getMainInputName(context.getPTransform())),
+          mainInputConsumer);
+      context.addTearDownFunction(runner::tearDown);
+    }
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private final boolean observesWindow;
+  private final PipelineOptions pipelineOptions;
+
+  private final DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  private final FnDataReceiver<
+          WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+      mainOutputConsumer;
+
+  private final FnApiStateAccessor<?> stateAccessor;
+
+  private final TruncateSizedRestrictionArgumentProvider 
mutableArgumentProvider;
+
+  private final Coder<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>
+      inputCoder;
+  private final String pTransformId;
+  private final RunnerApi.PTransform pTransform;
+  private final Coder<BoundedWindow> windowCoder;
+  private final String mainInputId;
+
+  /**
+   * Used to guarantee a consistent view of this {@link
+   * SplittableTruncateSizedRestrictionsDoFnRunner} while setting up for {@link
+   * DoFnInvoker#invokeProcessElement} since {@link #trySplit} may access 
internal {@link
+   * SplittableTruncateSizedRestrictionsDoFnRunner} state concurrently.
+   */
+  private final Object splitLock = new Object();
+
+  private final DoFnSchemaInformation doFnSchemaInformation;
+  private final Map<String, PCollectionView<?>> sideInputMapping;
+
+  ///
+  // Mutating fields that change with the element and window being processed
+  //
+  private int windowCurrentIndex;
+  private @Nullable List<BoundedWindow> currentWindows;
+  private @Nullable RestrictionT currentRestriction;
+  private @Nullable WatermarkEstimatorStateT currentWatermarkEstimatorState;
+  private @Nullable Instant initialWatermark;
+  private WatermarkEstimators.@Nullable 
WatermarkAndStateObserver<WatermarkEstimatorStateT>
+      currentWatermarkEstimator;
+  private @Nullable BoundedWindow currentWindow;
+  private @Nullable RestrictionTracker<RestrictionT, PositionT> currentTracker;
+  private @Nullable WindowedValue<InputT> currentElement;
+  /**
+   * The window index at which processing should stop. The window with this 
index should not be
+   * processed.
+   */
+  private int windowStopIndex;
+  /**
+   * The window index which is currently being processed. This should always 
be less than
+   * windowStopIndex.
+   */
+  SplittableTruncateSizedRestrictionsDoFnRunner(
+      PipelineOptions pipelineOptions,
+      String pTransformId,
+      PTransform pTransform,
+      RunnerApi.Components components,
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainOutputConsumer,
+      FnApiStateAccessor<Void> stateAccessor)
+      throws IOException {
+    this.pipelineOptions = pipelineOptions;
+    this.stateAccessor = stateAccessor;
+    this.pTransformId = pTransformId;
+    this.pTransform = pTransform;
+
+    ParDoPayload parDoPayload = 
ParDoPayload.parseFrom(pTransform.getSpec().getPayload());
+
+    // DoFn and metadata
+    DoFn<InputT, OutputT> doFn = (DoFn<InputT, OutputT>) 
ParDoTranslation.getDoFn(parDoPayload);
+    DoFnSignature doFnSignature = DoFnSignatures.signatureForDoFn(doFn);
+    this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions);
+    this.doFnSchemaInformation = 
ParDoTranslation.getSchemaInformation(parDoPayload);
+
+    this.mainOutputConsumer = mainOutputConsumer;
+
+    // Side inputs
+    this.sideInputMapping = ParDoTranslation.getSideInputMapping(parDoPayload);
+
+    // Register processing methods
+    this.observesWindow =
+        (doFnSignature.splitRestriction() != null
+                && doFnSignature.splitRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty();
+
+    if (observesWindow) {
+      this.mutableArgumentProvider = new 
TruncateSizedRestrictionWindowObservingArgumentProvider();
+    } else {
+      this.mutableArgumentProvider =
+          new TruncateSizedRestrictionNonWindowObservingArgumentProvider();
+    }
+
+    // Main Input
+    this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
+    RunnerApi.PCollection mainInput =
+        
components.getPcollectionsOrThrow(pTransform.getInputsOrThrow(mainInputId));
+    RehydratedComponents rehydratedComponents =
+        
RehydratedComponents.forComponents(components).withPipeline(Pipeline.create());
+    this.inputCoder =
+        (Coder<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>>)
+            rehydratedComponents.getCoder(mainInput.getCoderId());
+    this.windowCoder =
+        (Coder<BoundedWindow>)
+            rehydratedComponents
+                .getWindowingStrategy(mainInput.getWindowingStrategyId())
+                .getWindowFn()
+                .windowCoder();
+  }
+
+  @Override
+  public Void getCurrentKey() {
+    return null;
+  }
+
+  @Override
+  public BoundedWindow getCurrentWindow() {
+    return checkStateNotNull(
+        currentWindow, "Attempt to access window outside windowed element 
processing context.");
+  }
+
+  public List<BoundedWindow> getCurrentWindows() {
+    return checkStateNotNull(
+        currentWindows,
+        "Attempt to access window collection outside windowed element 
processing context.");
+  }
+
+  public WindowedValue<InputT> getCurrentElement() {
+    return checkStateNotNull(
+        currentElement, "Attempt to access element outside element processing 
context.");
+  }
+
+  private RestrictionT getCurrentRestriction() {
+    return checkStateNotNull(
+        this.currentRestriction,
+        "Attempt to access restriction outside element processing context.");
+  }
+
+  private RestrictionTracker<RestrictionT, ?> getCurrentTracker() {
+    return checkStateNotNull(
+        this.currentTracker,
+        "Attempt to access restriction tracker state outside element 
processing context.");
+  }
+
+  // Because WatermarkEstimatorStateT may allow nulls, we cannot use 
checkStateNotNull to ensure we
+  // are in an
+  // element processing context.
+  //
+  // But because it may _not_ accept nulls, we cannot safely return 
currentWatermarkEstimatorState
+  // without
+  // checking for null.
+  //
+  // There is no solution for the type of currentWatermarkEstimatorState; we 
would have to introduce
+  // a "MutableOptional"

Review Comment:
   Yes, I think `WatermarkEstimatorStateT` may be a nullable type. I reviewed 
the codebase to figure it out. It rarely is, but there are some cases, so 
`WatermarkStateT extends @NonNull Object` does not work (for `RestrictionT` it 
did work, and simplifies stuff a lot).
   
   The whole reason I made this getter was to validate that the method isn't 
called in an invalid state. We could just not check that, of course. Then we 
don't need the getter at all.
   
   The reason I suppressed the check and added a long-winded comment: it is 
impossible to convert `@Nullable WatermarkStateT` (the type of the field) to 
`WatermarkStateT`. We have this problem in a few places in the codebase.
   
    - We cannot check for null, because `WatermarkStateT` may be something like 
`@Nullabel String`.
    - If we don't check for null, then it is a type error, because 
`WatermarkStateT` may be something like `String`.
   
   We should actually add this to some simple analysis if we can: using a 
nullable field to store something with a type that is al type variable is 
always wrong.
   
   BUT I figured out the right pattern, finally, thanks to your comment making 
me think a bit harder.
   
    - We cannot use `Optional` because actually `Optional<T>` does not allow a 
nullable type for `T` (bizarre since this could be one of its only purposes) 
and also the JDK has a lot of opinions about how and where it should be used 
which render it essentially useless.
    - I created `Holder<T>` that just boxes the value.
    - So now a `@Nullable Holder<T>` is either null or a valid boxed `T`, 
whether or not `T` includes the value `null`.



##########
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/SplittableTruncateSizedRestrictionsDoFnRunner.java:
##########
@@ -0,0 +1,1016 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.service.AutoService;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.fn.harness.state.FnApiStateAccessor;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.BundleApplication;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.DelayedBundleApplication;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
+import org.apache.beam.sdk.fn.splittabledofn.WatermarkEstimators;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import 
org.apache.beam.sdk.transforms.reflect.DoFnInvoker.DelegatingArgumentProvider;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
+import 
org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.TruncateResult;
+import 
org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ByteStringOutputStream;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.construction.PTransformTranslation;
+import org.apache.beam.sdk.util.construction.ParDoTranslation;
+import org.apache.beam.sdk.util.construction.RehydratedComponents;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A runner for the PTransform that truncates sized restrictions, for the case 
of draining a
+ * pipeline.
+ *
+ * <p>The input and output types for this transform are
+ * <li>{@code WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>}
+ */
+public class SplittableTruncateSizedRestrictionsDoFnRunner<
+        InputT, RestrictionT extends @NonNull Object, PositionT, 
WatermarkEstimatorStateT, OutputT>
+    implements FnApiStateAccessor.MutatingStateContext<Void, BoundedWindow> {
+
+  /** A registrar which provides a factory to handle Java {@link DoFn}s. */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements PTransformRunnerFactory.Registrar {
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() 
{
+      Factory factory = new Factory();
+      return ImmutableMap.<String, PTransformRunnerFactory>builder()
+          
.put(PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN, factory)
+          .build();
+    }
+  }
+
+  static class Factory implements PTransformRunnerFactory {
+    @Override
+    public final void addRunnerForPTransform(Context context) throws 
IOException {
+      addRunnerForTruncateSizedRestrictions(context);
+    }
+
+    private <
+            InputT,
+            RestrictionT extends @NonNull Object,
+            PositionT,
+            WatermarkEstimatorStateT,
+            OutputT>
+        void addRunnerForTruncateSizedRestrictions(Context context) throws 
IOException {
+
+      FnApiStateAccessor<Void> stateAccessor =
+          
FnApiStateAccessor.Factory.<Void>factoryForPTransformContext(context).create();
+
+      // Main output
+      checkArgument(
+          context.getPTransform().getOutputsMap().size() == 1,
+          "TruncateSizedRestrictions expects exact one output, but got: ",
+          context.getPTransform().getOutputsMap().size());
+      TupleTag<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>> mainOutputTag =
+          new TupleTag<>(
+              
Iterables.getOnlyElement(context.getPTransform().getOutputsMap().keySet()));
+
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainOutputConsumer =
+              context.getPCollectionConsumer(
+                  
context.getPTransform().getOutputsOrThrow(mainOutputTag.getId()));
+
+      SplittableTruncateSizedRestrictionsDoFnRunner<
+              InputT, RestrictionT, PositionT, WatermarkEstimatorStateT, 
OutputT>
+          runner =
+              new SplittableTruncateSizedRestrictionsDoFnRunner<>(
+                  context.getPipelineOptions(),
+                  context.getPTransformId(),
+                  context.getPTransform(),
+                  context.getComponents(),
+                  mainOutputConsumer,
+                  stateAccessor);
+
+      // Register input consumer that delegates splitting
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainInputConsumer;
+
+      if (mainOutputConsumer instanceof HandlesSplits) {
+        mainInputConsumer =
+            new SplitDelegatingFnDataReceiver<>(runner, (HandlesSplits) 
mainOutputConsumer);
+      } else {
+        mainInputConsumer = runner::processElement;
+      }
+
+      context.addPCollectionConsumer(
+          context
+              .getPTransform()
+              
.getInputsOrThrow(ParDoTranslation.getMainInputName(context.getPTransform())),
+          mainInputConsumer);
+      context.addTearDownFunction(runner::tearDown);
+    }
+  }
+
+  
//////////////////////////////////////////////////////////////////////////////////////////////////
+
+  private final boolean observesWindow;
+  private final PipelineOptions pipelineOptions;
+
+  private final DoFnInvoker<InputT, OutputT> doFnInvoker;
+
+  private final FnDataReceiver<
+          WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+      mainOutputConsumer;
+
+  private final FnApiStateAccessor<?> stateAccessor;
+
+  private final TruncateSizedRestrictionArgumentProvider 
mutableArgumentProvider;
+
+  private final Coder<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>
+      inputCoder;
+  private final String pTransformId;
+  private final RunnerApi.PTransform pTransform;
+  private final Coder<BoundedWindow> windowCoder;
+  private final String mainInputId;
+
+  /**
+   * Used to guarantee a consistent view of this {@link
+   * SplittableTruncateSizedRestrictionsDoFnRunner} while setting up for {@link
+   * DoFnInvoker#invokeProcessElement} since {@link #trySplit} may access 
internal {@link
+   * SplittableTruncateSizedRestrictionsDoFnRunner} state concurrently.
+   */
+  private final Object splitLock = new Object();
+
+  private final DoFnSchemaInformation doFnSchemaInformation;
+  private final Map<String, PCollectionView<?>> sideInputMapping;
+
+  ///
+  // Mutating fields that change with the element and window being processed
+  //
+  private int windowCurrentIndex;
+  private @Nullable List<BoundedWindow> currentWindows;
+  private @Nullable RestrictionT currentRestriction;
+  private @Nullable WatermarkEstimatorStateT currentWatermarkEstimatorState;
+  private @Nullable Instant initialWatermark;
+  private WatermarkEstimators.@Nullable 
WatermarkAndStateObserver<WatermarkEstimatorStateT>
+      currentWatermarkEstimator;
+  private @Nullable BoundedWindow currentWindow;
+  private @Nullable RestrictionTracker<RestrictionT, PositionT> currentTracker;
+  private @Nullable WindowedValue<InputT> currentElement;
+  /**
+   * The window index at which processing should stop. The window with this 
index should not be
+   * processed.
+   */
+  private int windowStopIndex;
+  /**
+   * The window index which is currently being processed. This should always 
be less than
+   * windowStopIndex.
+   */
+  SplittableTruncateSizedRestrictionsDoFnRunner(
+      PipelineOptions pipelineOptions,
+      String pTransformId,
+      PTransform pTransform,
+      RunnerApi.Components components,
+      FnDataReceiver<
+              WindowedValue<KV<KV<InputT, KV<RestrictionT, 
WatermarkEstimatorStateT>>, Double>>>
+          mainOutputConsumer,
+      FnApiStateAccessor<Void> stateAccessor)
+      throws IOException {
+    this.pipelineOptions = pipelineOptions;
+    this.stateAccessor = stateAccessor;
+    this.pTransformId = pTransformId;
+    this.pTransform = pTransform;
+
+    ParDoPayload parDoPayload = 
ParDoPayload.parseFrom(pTransform.getSpec().getPayload());
+
+    // DoFn and metadata
+    DoFn<InputT, OutputT> doFn = (DoFn<InputT, OutputT>) 
ParDoTranslation.getDoFn(parDoPayload);
+    DoFnSignature doFnSignature = DoFnSignatures.signatureForDoFn(doFn);
+    this.doFnInvoker = DoFnInvokers.tryInvokeSetupFor(doFn, pipelineOptions);
+    this.doFnSchemaInformation = 
ParDoTranslation.getSchemaInformation(parDoPayload);
+
+    this.mainOutputConsumer = mainOutputConsumer;
+
+    // Side inputs
+    this.sideInputMapping = ParDoTranslation.getSideInputMapping(parDoPayload);
+
+    // Register processing methods
+    this.observesWindow =
+        (doFnSignature.splitRestriction() != null
+                && doFnSignature.splitRestriction().observesWindow())
+            || (doFnSignature.newTracker() != null && 
doFnSignature.newTracker().observesWindow())
+            || (doFnSignature.getSize() != null && 
doFnSignature.getSize().observesWindow())
+            || !sideInputMapping.isEmpty();
+
+    if (observesWindow) {
+      this.mutableArgumentProvider = new 
TruncateSizedRestrictionWindowObservingArgumentProvider();
+    } else {
+      this.mutableArgumentProvider =
+          new TruncateSizedRestrictionNonWindowObservingArgumentProvider();
+    }
+
+    // Main Input
+    this.mainInputId = ParDoTranslation.getMainInputName(pTransform);
+    RunnerApi.PCollection mainInput =
+        
components.getPcollectionsOrThrow(pTransform.getInputsOrThrow(mainInputId));
+    RehydratedComponents rehydratedComponents =
+        
RehydratedComponents.forComponents(components).withPipeline(Pipeline.create());
+    this.inputCoder =
+        (Coder<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, 
Double>>)
+            rehydratedComponents.getCoder(mainInput.getCoderId());
+    this.windowCoder =
+        (Coder<BoundedWindow>)
+            rehydratedComponents
+                .getWindowingStrategy(mainInput.getWindowingStrategyId())
+                .getWindowFn()
+                .windowCoder();
+  }
+
+  @Override
+  public Void getCurrentKey() {
+    return null;
+  }
+
+  @Override
+  public BoundedWindow getCurrentWindow() {
+    return checkStateNotNull(
+        currentWindow, "Attempt to access window outside windowed element 
processing context.");
+  }
+
+  public List<BoundedWindow> getCurrentWindows() {
+    return checkStateNotNull(
+        currentWindows,
+        "Attempt to access window collection outside windowed element 
processing context.");
+  }
+
+  public WindowedValue<InputT> getCurrentElement() {
+    return checkStateNotNull(
+        currentElement, "Attempt to access element outside element processing 
context.");
+  }
+
+  private RestrictionT getCurrentRestriction() {
+    return checkStateNotNull(
+        this.currentRestriction,
+        "Attempt to access restriction outside element processing context.");
+  }
+
+  private RestrictionTracker<RestrictionT, ?> getCurrentTracker() {
+    return checkStateNotNull(
+        this.currentTracker,
+        "Attempt to access restriction tracker state outside element 
processing context.");
+  }
+
+  // Because WatermarkEstimatorStateT may allow nulls, we cannot use 
checkStateNotNull to ensure we
+  // are in an
+  // element processing context.
+  //
+  // But because it may _not_ accept nulls, we cannot safely return 
currentWatermarkEstimatorState
+  // without
+  // checking for null.
+  //
+  // There is no solution for the type of currentWatermarkEstimatorState; we 
would have to introduce
+  // a "MutableOptional"

Review Comment:
   Yes, I think `WatermarkEstimatorStateT` may be a nullable type. I reviewed 
the codebase to figure it out. It rarely is, but there are some cases, so 
`WatermarkStateT extends @NonNull Object` does not work (for `RestrictionT` it 
did work, and is beneficial).
   
   The whole reason I made this getter was to validate that the method isn't 
called in an invalid state. We could just not check that, of course. Then we 
don't need the getter at all.
   
   The reason I suppressed the check and added a long-winded comment: it is 
impossible to convert `@Nullable WatermarkStateT` (the type of the field) to 
`WatermarkStateT`. We have this problem in a few places in the codebase.
   
    - We cannot check for null, because `WatermarkStateT` may be something like 
`@Nullabel String`.
    - If we don't check for null, then it is a type error, because 
`WatermarkStateT` may be something like `String`.
   
   We should actually add this to some simple analysis if we can: using a 
nullable field to store something with a type that is al type variable is 
always wrong.
   
   BUT I figured out the right pattern, finally, thanks to your comment making 
me think a bit harder.
   
    - We cannot use `Optional` because actually `Optional<T>` does not allow a 
nullable type for `T` (bizarre since this could be one of its only purposes) 
and also the JDK has a lot of opinions about how and where it should be used 
which render it essentially useless.
    - I created `Holder<T>` that just boxes the value.
    - So now a `@Nullable Holder<T>` is either null or a valid boxed `T`, 
whether or not `T` includes the value `null`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to