gemini-code-assist[bot] commented on code in PR #38363:
URL: https://github.com/apache/beam/pull/38363#discussion_r3357296557


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java:
##########
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import org.apache.beam.runners.dataflow.worker.counters.Counter;
+import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
+import org.apache.beam.runners.dataflow.worker.counters.CounterName;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
+import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.state.State;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+class SimpleParDoFnHelpers<InputT, OutputT, W extends BoundedWindow> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleParDoFnHelpers.class);
+
+  // TODO: Remove once Distributions has shipped.
+  @VisibleForTesting
+  static final String OUTPUTS_PER_ELEMENT_EXPERIMENT = 
"outputs_per_element_counter";
+
+  private static final String COUNTER_NAME = "per-element-output-count";
+
+  final PipelineOptions options;
+  final DoFnInstanceManager doFnInstanceManager;
+
+  private final SideInputReader sideInputReader;
+  final DataflowOperationContext operationContext;
+  private final TupleTag<OutputT> mainOutputTag;
+  private final Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices;
+  private final List<TupleTag<?>> sideOutputTags;
+  final DataflowExecutionContext.DataflowStepContext stepContext;
+  final DataflowExecutionContext.DataflowStepContext userStepContext;
+  private final CounterFactory counterFactory;
+  private final DoFnRunnerFactory runnerFactory;
+  final boolean hasStreamingSideInput;
+  final OutputsPerElementTracker outputsPerElementTracker;
+  private final DoFnSchemaInformation doFnSchemaInformation;
+  private final Map<String, PCollectionView<?>> sideInputMapping;
+
+  // Various DoFn helpers, null between bundles
+  @Nullable DoFnRunner<InputT, OutputT> fnRunner;
+  @Nullable DoFnInfo<InputT, OutputT> fnInfo;
+  private Receiver @Nullable [] receivers;
+
+  // This may additionally be null if it is not a real DoFn but an OldDoFn or
+  // GroupAlsoByWindowViaWindowSetDoFn
+  protected @Nullable DoFnSignature fnSignature;
+
+  SimpleParDoFnHelpers(
+      PipelineOptions options,
+      DoFnInstanceManager doFnInstanceManager,
+      SideInputReader sideInputReader,
+      TupleTag<OutputT> mainOutputTag,
+      Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
+      DataflowExecutionContext.DataflowStepContext stepContext,
+      DataflowOperationContext operationContext,
+      DoFnSchemaInformation doFnSchemaInformation,
+      Map<String, PCollectionView<?>> sideInputMapping,
+      DoFnRunnerFactory runnerFactory) {
+    this.options = options;
+    this.doFnInstanceManager = doFnInstanceManager;
+
+    // We vend a freshly deserialized version for each run
+    this.sideInputReader = sideInputReader;
+    this.operationContext = operationContext;
+    checkArgument(!outputTupleTagsToReceiverIndices.isEmpty(), "expected at 
least one output");
+    this.mainOutputTag = mainOutputTag;
+    this.outputTupleTagsToReceiverIndices = outputTupleTagsToReceiverIndices;
+    ImmutableList.Builder<TupleTag<?>> sideOutputTagsBuilder = 
ImmutableList.builder();
+    for (TupleTag<?> tag : outputTupleTagsToReceiverIndices.keySet()) {
+      if (!mainOutputTag.equals(tag)) {
+        sideOutputTagsBuilder.add(tag);
+      }
+    }
+    this.sideOutputTags = sideOutputTagsBuilder.build();
+    this.stepContext = stepContext;
+
+    // StepContext provides a TimerInternals and StateInternals for use by the 
system - this class.
+    // For the user, we request a user-scoped StepContext to provide a 
user-scoped
+    // StateInternals and TimerInternals.
+    this.userStepContext = stepContext.namespacedToUser();
+
+    this.counterFactory = operationContext.counterFactory();
+    this.runnerFactory = runnerFactory;
+    this.hasStreamingSideInput =
+        options.as(StreamingOptions.class).isStreaming() && 
!sideInputReader.isEmpty();
+    this.outputsPerElementTracker = createOutputsPerElementTracker();
+    this.doFnSchemaInformation = doFnSchemaInformation;
+    this.sideInputMapping = sideInputMapping;
+  }
+
+  boolean hasState() {
+    return fnSignature != null && !fnSignature.stateDeclarations().isEmpty();
+  }
+
+  void startBundle(Receiver... receivers) throws Exception {
+    checkArgument(
+        receivers.length == outputTupleTagsToReceiverIndices.size(),
+        "unexpected number of receivers for DoFn");
+
+    this.receivers = receivers;
+  }
+
+  void reallyStartBundle() throws Exception {
+    checkState(fnRunner == null, "bundle already started (or not properly 
finished)");
+
+    WindowedValueMultiReceiver outputManager =
+        new WindowedValueMultiReceiver() {
+          final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new 
HashMap<>();
+
+          private @Nullable Receiver getReceiverOrNull(TupleTag<?> tag) {
+            Integer receiverIndex = outputTupleTagsToReceiverIndices.get(tag);
+            if (receiverIndex != null) {
+              return receivers[receiverIndex];
+            } else {
+              return undeclaredOutputs.get(tag);
+            }
+          }
+
+          @Override
+          public <TagT> void output(TupleTag<TagT> tag, WindowedValue<TagT> 
output) {
+            outputsPerElementTracker.onOutput();
+            Receiver receiver = getReceiverOrNull(tag);
+            if (receiver == null) {
+              // A new undeclared output.
+              // TODO: plumb through the operationName, so that we can
+              // name implicit outputs after it.
+              String outputName = "implicit-" + tag.getId();
+              // TODO: plumb through the counter prefix, so we can
+              // make it available to the OutputReceiver class in case
+              // it wants to use it in naming output counterFactory.  (It
+              // doesn't today.)
+              OutputReceiver undeclaredReceiver = new OutputReceiver();
+
+              ElementCounter outputCounter =
+                  new DataflowOutputCounter(
+                      outputName, counterFactory, 
stepContext.getNameContext());
+              undeclaredReceiver.addOutputCounter(outputCounter);
+              undeclaredOutputs.put(tag, undeclaredReceiver);
+              receiver = undeclaredReceiver;
+            }
+
+            try {
+              receiver.process(output);
+            } catch (RuntimeException | Error e) {
+              // Rethrow unchecked exceptions as-is to avoid excessive nesting
+              // via a chain of DoFn's.
+              throw e;
+            } catch (Exception e) {
+              // This should never happen in practice with DoFn's, but can 
happen
+              // with other Receivers.
+              throw new RuntimeException(e);
+            }
+          }
+        };
+    fnInfo = (DoFnInfo) doFnInstanceManager.get();
+    fnSignature = DoFnSignatures.getSignature(fnInfo.getDoFn().getClass());
+
+    fnRunner =
+        runnerFactory.createRunner(
+            fnInfo.getDoFn(),
+            options,
+            mainOutputTag,
+            sideOutputTags,
+            fnInfo.getSideInputViews(),
+            sideInputReader,
+            fnInfo.getInputCoder(),
+            fnInfo.getOutputCoders(),
+            fnInfo.getWindowingStrategy(),
+            stepContext,
+            userStepContext,
+            outputManager,
+            doFnSchemaInformation,
+            sideInputMapping);
+    fnRunner.startBundle();
+  }
+
+  void finishBundle(StreamingSideInputProcessor<?, ?> sideInputProcessor) 
throws Exception {
+    if (fnRunner != null) {
+      fnRunner.finishBundle();
+      if (sideInputProcessor != null) {
+        sideInputProcessor.handleFinishBundle();
+      }
+      doFnInstanceManager.complete(fnInfo);
+      fnRunner = null;
+      fnInfo = null;
+      fnSignature = null;
+      sideInputProcessor = null;
+    }
+  }
+
+  void abort() throws Exception {
+    doFnInstanceManager.abort(fnInfo);
+    fnRunner = null;
+    fnInfo = null;
+  }
+
+  @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
+
+  enum TimerType {
+    USER {
+      @Override
+      public void processTimer(
+          SimpleParDoFnHelpers doFn,
+          TimerInternals.TimerData timer,
+          StreamingSideInputProcessor sideInputProcessor)
+          throws Exception {
+        doFn.processUserTimer(timer, sideInputProcessor);
+      }
+    },
+    FAIL_USER {
+      @Override
+      public void processTimer(
+          SimpleParDoFnHelpers doFn,
+          TimerInternals.TimerData timer,
+          StreamingSideInputProcessor sideInputProcessor)
+          throws Exception {
+        throw new UnsupportedOperationException(
+            "Attempt to deliver a timer to a DoFn, but timers are not 
supported here.");
+      }
+    },
+    SYSTEM {
+      @Override
+      public void processTimer(
+          SimpleParDoFnHelpers doFn,
+          TimerInternals.TimerData timer,
+          StreamingSideInputProcessor sideInputProcessor)
+          throws Exception {
+        doFn.processSystemTimer(timer, sideInputProcessor);
+      }
+    };
+
+    public abstract void processTimer(
+        SimpleParDoFnHelpers doFn,
+        TimerInternals.TimerData timer,
+        StreamingSideInputProcessor sideInputProcessor)
+        throws Exception;
+  };
+
+  void processTimers(
+      TimerType mode,
+      DataflowExecutionContext.DataflowStepContext context,
+      Coder<BoundedWindow> windowCoder,
+      Runnable startKey,
+      StreamingSideInputProcessor<?, ?> sideInputProcessor)
+      throws Exception {
+    TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder);
+
+    if (timer != null && fnRunner == null) {
+      // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
+      // sampler into the start state.
+      try (Closeable start = operationContext.enterStart()) {
+        reallyStartBundle();
+        startKey.run();
+      }
+    }
+
+    while (timer != null) {
+      mode.processTimer(this, timer, sideInputProcessor);
+      timer = context.getNextFiredTimer(windowCoder);
+    }
+  }

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Because Java is pass-by-value, passing `sideInputProcessor` directly to 
`processTimers` means that if `fnRunner` is initially `null`, the 
`sideInputProcessor` argument is evaluated as `null`. Even though 
`startKey.run()` (which calls `onStartKey()`) subsequently initializes 
`sideInputProcessor` on the caller side, the local parameter 
`sideInputProcessor` inside `processTimers` remains `null`. As a result, 
`mode.processTimer` will receive `null` and the side input checks for timers 
will be bypassed.\n\nTo fix this, pass a 
`java.util.function.Supplier<StreamingSideInputProcessor<?, ?>>` instead of the 
processor directly.
   
   ```java
     void processTimers(\n      TimerType mode,\n      
DataflowExecutionContext.DataflowStepContext context,\n      
Coder<BoundedWindow> windowCoder,\n      Runnable startKey,\n      
java.util.function.Supplier<StreamingSideInputProcessor<?, ?>> 
sideInputProcessorSupplier)\n      throws Exception {\n    
TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder);\n\n    
if (timer != null && fnRunner == null) {\n      // If we need to run 
reallyStartBundle in here, we need to make sure to switch the state\n      // 
sampler into the start state.\n      try (Closeable start = 
operationContext.enterStart()) {\n        reallyStartBundle();\n        
startKey.run();\n      }\n    }\n\n    while (timer != null) {\n      
mode.processTimer(this, timer, sideInputProcessorSupplier.get());\n      timer 
= context.getNextFiredTimer(windowCoder);\n    }\n  }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -355,180 +177,33 @@ public void processTimers() throws Exception {
     // exist without actually decoding them.
     Coder<BoundedWindow> windowCoder =
         (Coder<BoundedWindow>)
-            (fnInfo != null ? fnInfo : doFnInstanceManager.peek())
+            (helpers.fnInfo != null ? helpers.fnInfo : 
helpers.doFnInstanceManager.peek())
                 .getWindowingStrategy()
                 .getWindowFn()
                 .windowCoder();
-    processTimers(TimerType.USER, userStepContext, windowCoder);
-    processTimers(TimerType.SYSTEM, stepContext, windowCoder);
-  }
-
-  private void processUserTimer(TimerData timer) throws Exception {
-    if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
-        || 
fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
-      BoundedWindow window = ((WindowNamespace) 
timer.getNamespace()).getWindow();
-      fnRunner.onTimer(
-          timer.getTimerId(),
-          timer.getTimerFamilyId(),
-          this.stepContext.stateInternals().getKey(),
-          window,
-          timer.getTimestamp(),
-          timer.getOutputTimestamp(),
-          timer.getDomain(),
-          timer.causedByDrain());
-    }
-  }
-
-  private void processSystemTimer(TimerData timer) throws Exception {
-
-    // Timer owned by this class, for cleaning up state in expired windows
-    if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) {
-      checkState(
-          timer.getDomain().equals(TimeDomain.EVENT_TIME),
-          "%s received cleanup timer with domain not EVENT_TIME: %s",
-          this,
-          timer);
-
-      checkState(
-          timer.getNamespace() instanceof WindowNamespace,
-          "%s received cleanup timer not for a %s: %s",
-          this,
-          WindowNamespace.class.getSimpleName(),
-          timer);
-
-      BoundedWindow window = ((WindowNamespace) 
timer.getNamespace()).getWindow();
-      Instant targetTime = earliestAllowableCleanupTime(window, 
fnInfo.getWindowingStrategy());
-
-      checkState(
-          !targetTime.isAfter(timer.getTimestamp()),
-          "%s received state cleanup timer for window %s "
-              + " that is before the appropriate cleanup time %s",
-          this,
-          window,
-          targetTime);
-
-      fnRunner.onWindowExpiration(
-          window, timer.getOutputTimestamp(), 
this.stepContext.stateInternals().getKey());
-
-      // This is for a timer for a window that is expired, so clean it up.
-      for (StateDeclaration stateDecl : 
fnSignature.stateDeclarations().values()) {
-        StateTag<?> tag;
-        try {
-          tag =
-              StateTags.tagForSpec(
-                  stateDecl.id(), (StateSpec) 
stateDecl.field().get(fnInfo.getDoFn()));
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(
-              String.format(
-                  "Error accessing %s for %s",
-                  StateSpec.class.getName(), 
fnInfo.getDoFn().getClass().getName()),
-              e);
-        }
-
-        StateInternals stateInternals = userStepContext.stateInternals();
-        org.apache.beam.sdk.state.State state = 
stateInternals.state(timer.getNamespace(), tag);
-        state.clear();
-      }
-    }
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.USER,
+        helpers.userStepContext,
+        windowCoder,
+        this::onStartKey,
+        sideInputProcessor);
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.SYSTEM,
+        helpers.stepContext,
+        windowCoder,
+        this::onStartKey,
+        sideInputProcessor);

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Change the `sideInputProcessor` argument to a lambda supplier `() -> 
sideInputProcessor` to avoid passing a stale `null` reference when the bundle 
is started lazily during timer processing.
   
   ```java
       helpers.processTimers(\n        SimpleParDoFnHelpers.TimerType.USER,\n   
     helpers.userStepContext,\n        windowCoder,\n        
this::onStartKey,\n        () -> sideInputProcessor);\n    
helpers.processTimers(\n        SimpleParDoFnHelpers.TimerType.SYSTEM,\n        
helpers.stepContext,\n        windowCoder,\n        this::onStartKey,\n        
() -> sideInputProcessor);
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */
+public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W 
extends BoundedWindow>
+    implements ParDoFn {
+  private final StateTag<ValueState<K>> keyAddr;
+  private final Coder<InputT> inputCoder;
+  private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> 
helpers;
+  protected @Nullable StreamingSideInputProcessor<InputT, W> 
sideInputProcessor;
+
+  StreamingKeyedWorkKitemSideInputParDoFn(
+      PipelineOptions options,
+      DoFnInstanceManager doFnInstanceManager,
+      SideInputReader sideInputReader,
+      TupleTag<OutputT> mainOutputTag,
+      Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
+      DataflowExecutionContext.DataflowStepContext stepContext,
+      DataflowOperationContext operationContext,
+      DoFnSchemaInformation doFnSchemaInformation,
+      Map<String, PCollectionView<?>> sideInputMapping,
+      DoFnRunnerFactory runnerFactory,
+      Coder<K> keyCoder,
+      Coder<InputT> inputCoder) {
+    helpers =
+        new SimpleParDoFnHelpers<>(
+            options,
+            doFnInstanceManager,
+            sideInputReader,
+            mainOutputTag,
+            outputTupleTagsToReceiverIndices,
+            stepContext,
+            operationContext,
+            doFnSchemaInformation,
+            sideInputMapping,
+            runnerFactory);
+    this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key", 
keyCoder));
+    this.inputCoder = inputCoder;
+  }
+
+  ValueState<K> keyValue() {
+    return 
helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr);
+  }
+
+  @Override
+  public void startBundle(Receiver... receivers) throws Exception {
+    helpers.startBundle(receivers);
+    if (helpers.hasStreamingSideInput) {
+      // There is non-trivial setup that needs to be performed for watermark 
propagation
+      // even on empty bundles.
+      helpers.reallyStartBundle();
+      onStartKey();
+    }
+  }
+
+  protected void onStartKey() {
+    if (helpers.hasStreamingSideInput) {
+      sideInputProcessor =
+          new StreamingSideInputProcessor<>(
+              new StreamingSideInputFetcher<InputT, W>(
+                  helpers.fnInfo.getSideInputViews(),
+                  inputCoder,
+                  (WindowingStrategy<?, W>) 
helpers.fnInfo.getWindowingStrategy(),
+                  (StreamingModeExecutionContext.StreamingModeStepContext)
+                      helpers.userStepContext));
+    }
+
+    if (sideInputProcessor != null) {
+      boolean hasState = helpers.hasState();
+
+      // TODO(relax): We should be able to get this without writing it to 
state!
+      K key = keyValue().read();
+
+      sideInputProcessor.tryUnblockElementsAndTimers(
+          (unblockedElements, unblockedTimers) -> {
+            if (!Iterables.isEmpty(unblockedElements) || 
!Iterables.isEmpty(unblockedTimers)) {
+              helpers.fnRunner.processElement(
+                  new ValueInEmptyWindows<>(
+                      KeyedWorkItems.workItem(key, unblockedTimers, 
unblockedElements)));
+            }
+            if (hasState) {
+              List<W> windows =
+                  (List<W>)
+                      StreamSupport.stream(unblockedElements.spliterator(), 
false)
+                          .flatMap(wv -> wv.getWindows().stream())
+                          .collect(Collectors.toList());
+              // These elements are now processed. Register cleanup timers for 
all the unblocked
+              // windows.
+              helpers.registerStateCleanup(
+                  (WindowingStrategy<?, W>) 
getDoFnInfo().getWindowingStrategy(), windows);
+            }
+          });
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void processElement(Object untypedElem) throws Exception {
+    if (helpers.fnRunner == null) {
+      // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
+      // sampler into the start state.
+      try (Closeable start = helpers.operationContext.enterStart()) {
+        helpers.reallyStartBundle();
+        onStartKey();
+      }
+    }
+    helpers.outputsPerElementTracker.onProcessElement();
+
+    WindowedValue<KeyedWorkItem<K, InputT>> elem =
+        (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem;
+    onProcessWindowedValue(elem);
+
+    helpers.outputsPerElementTracker.onProcessElementSuccess();
+  }
+
+  @Override
+  public void processTimers() throws Exception {
+
+    // Note: We need to get windowCoder to decode the timers.  If we haven't 
already deserialized
+    // the fnInfo, we peek at a new instance to retrieve that. If this extra 
deserialization becomes
+    // excessively costly, we could either (1) have the DoFnInstanceManager 
remember the associated
+    // windowCoder (allowing us to get it without a DoFnInfo instance) or (2) 
check whether timers
+    // exist without actually decoding them.
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>)
+            (helpers.fnInfo != null ? helpers.fnInfo : 
helpers.doFnInstanceManager.peek())
+                .getWindowingStrategy()
+                .getWindowFn()
+                .windowCoder();
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.FAIL_USER,
+        helpers.userStepContext,
+        windowCoder,
+        this::onStartKey,
+        sideInputProcessor);
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.SYSTEM,
+        helpers.stepContext,
+        windowCoder,
+        this::onStartKey,
+        sideInputProcessor);

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   Change the `sideInputProcessor` argument to a lambda supplier `() -> 
sideInputProcessor` to avoid passing a stale `null` reference when the bundle 
is started lazily during timer processing.
   
   ```java
       helpers.processTimers(\n        
SimpleParDoFnHelpers.TimerType.FAIL_USER,\n        helpers.userStepContext,\n   
     windowCoder,\n        this::onStartKey,\n        () -> 
sideInputProcessor);\n    helpers.processTimers(\n        
SimpleParDoFnHelpers.TimerType.SYSTEM,\n        helpers.stepContext,\n        
windowCoder,\n        this::onStartKey,\n        () -> sideInputProcessor);
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */
+public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W 
extends BoundedWindow>
+    implements ParDoFn {
+  private final StateTag<ValueState<K>> keyAddr;
+  private final Coder<InputT> inputCoder;
+  private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> 
helpers;
+  protected @Nullable StreamingSideInputProcessor<InputT, W> 
sideInputProcessor;
+
+  StreamingKeyedWorkKitemSideInputParDoFn(
+      PipelineOptions options,
+      DoFnInstanceManager doFnInstanceManager,
+      SideInputReader sideInputReader,
+      TupleTag<OutputT> mainOutputTag,
+      Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
+      DataflowExecutionContext.DataflowStepContext stepContext,
+      DataflowOperationContext operationContext,
+      DoFnSchemaInformation doFnSchemaInformation,
+      Map<String, PCollectionView<?>> sideInputMapping,
+      DoFnRunnerFactory runnerFactory,
+      Coder<K> keyCoder,
+      Coder<InputT> inputCoder) {
+    helpers =
+        new SimpleParDoFnHelpers<>(
+            options,
+            doFnInstanceManager,
+            sideInputReader,
+            mainOutputTag,
+            outputTupleTagsToReceiverIndices,
+            stepContext,
+            operationContext,
+            doFnSchemaInformation,
+            sideInputMapping,
+            runnerFactory);
+    this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key", 
keyCoder));
+    this.inputCoder = inputCoder;
+  }
+
+  ValueState<K> keyValue() {
+    return 
helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr);
+  }
+
+  @Override
+  public void startBundle(Receiver... receivers) throws Exception {
+    helpers.startBundle(receivers);
+    if (helpers.hasStreamingSideInput) {
+      // There is non-trivial setup that needs to be performed for watermark 
propagation
+      // even on empty bundles.
+      helpers.reallyStartBundle();
+      onStartKey();
+    }
+  }
+
+  protected void onStartKey() {
+    if (helpers.hasStreamingSideInput) {
+      sideInputProcessor =
+          new StreamingSideInputProcessor<>(
+              new StreamingSideInputFetcher<InputT, W>(
+                  helpers.fnInfo.getSideInputViews(),
+                  inputCoder,
+                  (WindowingStrategy<?, W>) 
helpers.fnInfo.getWindowingStrategy(),
+                  (StreamingModeExecutionContext.StreamingModeStepContext)
+                      helpers.userStepContext));
+    }
+
+    if (sideInputProcessor != null) {
+      boolean hasState = helpers.hasState();
+
+      // TODO(relax): We should be able to get this without writing it to 
state!
+      K key = keyValue().read();
+
+      sideInputProcessor.tryUnblockElementsAndTimers(
+          (unblockedElements, unblockedTimers) -> {
+            if (!Iterables.isEmpty(unblockedElements) || 
!Iterables.isEmpty(unblockedTimers)) {
+              helpers.fnRunner.processElement(
+                  new ValueInEmptyWindows<>(
+                      KeyedWorkItems.workItem(key, unblockedTimers, 
unblockedElements)));
+            }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `key` is null when there are unblocked elements or timers, calling 
`KeyedWorkItems.workItem(key, ...)` will likely result in a 
`NullPointerException` or invalid state downstream. It is safer to enforce that 
`key` is non-null using `Objects.requireNonNull`.
   
   ```java
         sideInputProcessor.tryUnblockElementsAndTimers(\n          
(unblockedElements, unblockedTimers) -> {\n            if 
(!Iterables.isEmpty(unblockedElements) || !Iterables.isEmpty(unblockedTimers)) 
{\n              java.util.Objects.requireNonNull(key, \"Key is null but there 
are unblocked elements or timers\");\n              
helpers.fnRunner.processElement(\n                  new 
ValueInEmptyWindows<>(\n                      KeyedWorkItems.workItem(key, 
unblockedTimers, unblockedElements)));\n            }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */
+public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W 
extends BoundedWindow>
+    implements ParDoFn {
+  private final StateTag<ValueState<K>> keyAddr;
+  private final Coder<InputT> inputCoder;
+  private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> 
helpers;
+  protected @Nullable StreamingSideInputProcessor<InputT, W> 
sideInputProcessor;
+
+  StreamingKeyedWorkKitemSideInputParDoFn(
+      PipelineOptions options,
+      DoFnInstanceManager doFnInstanceManager,
+      SideInputReader sideInputReader,
+      TupleTag<OutputT> mainOutputTag,
+      Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
+      DataflowExecutionContext.DataflowStepContext stepContext,
+      DataflowOperationContext operationContext,
+      DoFnSchemaInformation doFnSchemaInformation,
+      Map<String, PCollectionView<?>> sideInputMapping,
+      DoFnRunnerFactory runnerFactory,
+      Coder<K> keyCoder,
+      Coder<InputT> inputCoder) {
+    helpers =
+        new SimpleParDoFnHelpers<>(
+            options,
+            doFnInstanceManager,
+            sideInputReader,
+            mainOutputTag,
+            outputTupleTagsToReceiverIndices,
+            stepContext,
+            operationContext,
+            doFnSchemaInformation,
+            sideInputMapping,
+            runnerFactory);
+    this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key", 
keyCoder));
+    this.inputCoder = inputCoder;
+  }
+
+  ValueState<K> keyValue() {
+    return 
helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr);
+  }
+
+  @Override
+  public void startBundle(Receiver... receivers) throws Exception {
+    helpers.startBundle(receivers);
+    if (helpers.hasStreamingSideInput) {
+      // There is non-trivial setup that needs to be performed for watermark 
propagation
+      // even on empty bundles.
+      helpers.reallyStartBundle();
+      onStartKey();
+    }
+  }
+
+  protected void onStartKey() {
+    if (helpers.hasStreamingSideInput) {
+      sideInputProcessor =
+          new StreamingSideInputProcessor<>(
+              new StreamingSideInputFetcher<InputT, W>(
+                  helpers.fnInfo.getSideInputViews(),
+                  inputCoder,
+                  (WindowingStrategy<?, W>) 
helpers.fnInfo.getWindowingStrategy(),
+                  (StreamingModeExecutionContext.StreamingModeStepContext)
+                      helpers.userStepContext));
+    }
+
+    if (sideInputProcessor != null) {
+      boolean hasState = helpers.hasState();
+
+      // TODO(relax): We should be able to get this without writing it to 
state!
+      K key = keyValue().read();
+
+      sideInputProcessor.tryUnblockElementsAndTimers(
+          (unblockedElements, unblockedTimers) -> {
+            if (!Iterables.isEmpty(unblockedElements) || 
!Iterables.isEmpty(unblockedTimers)) {
+              helpers.fnRunner.processElement(
+                  new ValueInEmptyWindows<>(
+                      KeyedWorkItems.workItem(key, unblockedTimers, 
unblockedElements)));
+            }
+            if (hasState) {
+              List<W> windows =
+                  (List<W>)
+                      StreamSupport.stream(unblockedElements.spliterator(), 
false)
+                          .flatMap(wv -> wv.getWindows().stream())
+                          .collect(Collectors.toList());
+              // These elements are now processed. Register cleanup timers for 
all the unblocked
+              // windows.
+              helpers.registerStateCleanup(
+                  (WindowingStrategy<?, W>) 
getDoFnInfo().getWindowingStrategy(), windows);
+            }
+          });
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void processElement(Object untypedElem) throws Exception {
+    if (helpers.fnRunner == null) {
+      // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
+      // sampler into the start state.
+      try (Closeable start = helpers.operationContext.enterStart()) {
+        helpers.reallyStartBundle();
+        onStartKey();
+      }
+    }
+    helpers.outputsPerElementTracker.onProcessElement();
+
+    WindowedValue<KeyedWorkItem<K, InputT>> elem =
+        (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem;
+    onProcessWindowedValue(elem);
+
+    helpers.outputsPerElementTracker.onProcessElementSuccess();
+  }
+
+  @Override
+  public void processTimers() throws Exception {
+
+    // Note: We need to get windowCoder to decode the timers.  If we haven't 
already deserialized
+    // the fnInfo, we peek at a new instance to retrieve that. If this extra 
deserialization becomes
+    // excessively costly, we could either (1) have the DoFnInstanceManager 
remember the associated
+    // windowCoder (allowing us to get it without a DoFnInfo instance) or (2) 
check whether timers
+    // exist without actually decoding them.
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>)
+            (helpers.fnInfo != null ? helpers.fnInfo : 
helpers.doFnInstanceManager.peek())
+                .getWindowingStrategy()
+                .getWindowFn()
+                .windowCoder();
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.FAIL_USER,
+        helpers.userStepContext,
+        windowCoder,
+        this::onStartKey,
+        sideInputProcessor);
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.SYSTEM,
+        helpers.stepContext,
+        windowCoder,
+        this::onStartKey,
+        sideInputProcessor);
+  }
+
+  @Override
+  public void finishBundle() throws Exception {
+    helpers.finishBundle(sideInputProcessor);
+    this.sideInputProcessor = null;
+  }
+
+  @Override
+  public void abort() throws Exception {
+    helpers.abort();
+  }
+
+  protected void onProcessWindowedValue(WindowedValue<KeyedWorkItem<K, 
InputT>> elem) {
+    // TODO: Get rid of this!
+    final K key = elem.getValue().key();
+    keyValue().write(key);
+
+    boolean hasState = helpers.hasState();
+    Collection<W> windowsProcessed;
+    if (sideInputProcessor != null) {
+      windowsProcessed = hasState ? Lists.newArrayList() : 
Collections.emptyList();
+      KeyedWorkItem<K, InputT> unblocked = 
sideInputProcessor.handleProcessKeyedWorkItem(elem);
+      if (!Iterables.isEmpty(unblocked.elementsIterable())
+          || !Iterables.isEmpty(unblocked.timersIterable())) {
+        helpers.fnRunner.processElement(elem.withValue(unblocked));
+      }
+      if (hasState) {
+        windowsProcessed =
+            (Collection<W>)
+                
StreamSupport.stream(unblocked.elementsIterable().spliterator(), false)
+                    .flatMap(wv -> wv.getWindows().stream())
+                    .collect(Collectors.toList());
+      }
+    } else {
+      helpers.fnRunner.processElement(elem);
+      windowsProcessed = (Collection<W>) elem.getWindows();
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   In the `else` branch, `elem` is of type `WindowedValue<KeyedWorkItem<K, 
InputT>>`. Calling `elem.getWindows()` will return the windows of the 
`KeyedWorkItem` wrapper (which are typically empty, e.g. when wrapped in 
`ValueInEmptyWindows`), rather than the actual windows of the elements inside 
the `KeyedWorkItem`. This means state cleanup will not be registered for the 
correct windows.\n\nYou should extract the windows from the elements inside the 
`KeyedWorkItem`, similar to how it is done in the `if` branch.
   
   ```java
       } else {\n      helpers.fnRunner.processElement(elem);\n      
windowsProcessed =\n          (Collection<W>)\n              
StreamSupport.stream(elem.getValue().elementsIterable().spliterator(), false)\n 
                 .flatMap(wv -> wv.getWindows().stream())\n                  
.collect(Collectors.toList());\n    }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkKitemSideInputParDoFn.java:
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */
+public class StreamingKeyedWorkKitemSideInputParDoFn<K, InputT, OutputT, W 
extends BoundedWindow>

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   There is a typo in the class name `StreamingKeyedWorkKitemSideInputParDoFn` 
(contains `WorkKitem` instead of `WorkItem`). Please rename the class and the 
file to `StreamingKeyedWorkItemSideInputParDoFn`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to