This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0856c22a9e9 Side Input improvements (#38363)
0856c22a9e9 is described below

commit 0856c22a9e92f7a7b5fb072139fdad2b14160228
Author: Reuven Lax <[email protected]>
AuthorDate: Tue Jun 9 11:10:17 2026 -0700

    Side Input improvements (#38363)
    
    * side-input improvements
    
    * remove files
    
    * add Override
    
    * foo
    
    * foo
    
    * fix splittable dofn
    
    * fix
    
    * foo
    
    * fix windows
    
    * fix compilation
    
    * foo
---
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  26 +-
 .../dataflow/worker/SimpleDoFnRunnerFactory.java   |  12 -
 .../runners/dataflow/worker/SimpleParDoFn.java     | 533 ++++-----------------
 ...impleParDoFn.java => SimpleParDoFnHelpers.java} | 409 ++++++++--------
 .../worker/SplittableProcessFnFactory.java         |  20 +-
 .../StreamingKeyedWorkItemSideInputParDoFn.java    | 246 ++++++++++
 .../worker/StreamingSideInputDoFnRunner.java       |  41 +-
 .../dataflow/worker/StreamingSideInputFetcher.java |  34 +-
 .../worker/StreamingSideInputProcessor.java        | 132 +++++
 .../dataflow/worker/UserParDoFnFactory.java        |  56 ++-
 .../dataflow/worker/SimpleParDoFnHelpersTest.java  | 133 +++++
 .../runners/dataflow/worker/SimpleParDoFnTest.java |   6 +-
 ...eamingKeyedWorkItemSideInputDoFnRunnerTest.java |   3 +-
 ...StreamingKeyedWorkItemSideInputParDoFnTest.java | 488 +++++++++++++++++++
 .../worker/StreamingSideInputProcessorTest.java    | 215 +++++++++
 .../dataflow/worker/UserParDoFnFactoryTest.java    |  27 +-
 runners/prism/java/build.gradle                    |   3 +
 .../beam/sdk/testing/UsesSideInputsInTimer.java    |  27 ++
 .../java/org/apache/beam/sdk/transforms/DoFn.java  |  16 +
 .../sdk/transforms/reflect/DoFnSignatures.java     |   9 +-
 .../org/apache/beam/sdk/transforms/ParDoTest.java  | 149 ++++++
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    |  28 ++
 22 files changed, 1854 insertions(+), 759 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 9859d672b3e..470e22a6699 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -870,8 +870,17 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     }
 
     @Override
-    public Object sideInput(String tagId) {
-      throw new UnsupportedOperationException("SideInput parameters are not 
supported.");
+    public @Nullable Object sideInput(String tagId) {
+      PCollectionView<?> view =
+          checkStateNotNull(sideInputMapping.get(tagId), "Side input tag %s 
not found", tagId);
+      return sideInput(view);
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      checkNotNull(view, "View passed to sideInput cannot be null");
+      return SimpleDoFnRunner.this.sideInput(
+          view, view.getWindowMappingFn().getSideInputWindow(window()));
     }
 
     @Override
@@ -1196,8 +1205,17 @@ public class SimpleDoFnRunner<InputT, OutputT> 
implements DoFnRunner<InputT, Out
     }
 
     @Override
-    public Object sideInput(String tagId) {
-      throw new UnsupportedOperationException("SideInput parameters are not 
supported.");
+    public @Nullable Object sideInput(String tagId) {
+      PCollectionView<?> view =
+          checkStateNotNull(sideInputMapping.get(tagId), "Side input tag %s 
not found", tagId);
+      return sideInput(view);
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      checkNotNull(view, "View passed to sideInput cannot be null");
+      return SimpleDoFnRunner.this.sideInput(
+          view, view.getWindowMappingFn().getSideInputWindow(window()));
     }
 
     @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
index 52fcec439aa..5286fc1aae9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleDoFnRunnerFactory.java
@@ -24,7 +24,6 @@ import org.apache.beam.runners.core.DoFnRunners;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
@@ -68,17 +67,6 @@ class SimpleDoFnRunnerFactory<InputT, OutputT> implements 
DoFnRunnerFactory<Inpu
             windowingStrategy,
             doFnSchemaInformation,
             sideInputMapping);
-    boolean hasStreamingSideInput =
-        options.as(StreamingOptions.class).isStreaming() && 
!sideInputReader.isEmpty();
-    if (hasStreamingSideInput) {
-      return new StreamingSideInputDoFnRunner<>(
-          fnRunner,
-          new StreamingSideInputFetcher<>(
-              sideInputViews,
-              inputCoder,
-              windowingStrategy,
-              (StreamingModeExecutionContext.StreamingModeStepContext) 
userStepContext));
-    }
     return fnRunner;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 434d46c20a5..34dff6b8835 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -17,53 +17,26 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
-import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
-
 import java.io.Closeable;
 import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.Map;
-import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.SideInputReader;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.runners.dataflow.worker.counters.Counter;
-import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
-import org.apache.beam.runners.dataflow.worker.counters.CounterName;
-import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
-import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.util.DoFnInfo;
-import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.checkerframework.checker.nullness.qual.Nullable;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A base class providing simple set up, processing, and tear down for a 
wrapped {@link
@@ -76,41 +49,9 @@ import org.slf4j.LoggerFactory;
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
-
-  // TODO: Remove once Distributions has shipped.
-  @VisibleForTesting
-  static final String OUTPUTS_PER_ELEMENT_EXPERIMENT = 
"outputs_per_element_counter";
-
-  private static final String COUNTER_NAME = "per-element-output-count";
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleParDoFn.class);
-
-  protected final PipelineOptions options;
-  private final DoFnInstanceManager doFnInstanceManager;
-
-  private final SideInputReader sideInputReader;
-  private final DataflowOperationContext operationContext;
-  private final TupleTag<OutputT> mainOutputTag;
-  private final Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices;
-  private final List<TupleTag<?>> sideOutputTags;
-  private final DataflowExecutionContext.DataflowStepContext stepContext;
-  private final DataflowExecutionContext.DataflowStepContext userStepContext;
-  private final CounterFactory counterFactory;
-  private final DoFnRunnerFactory runnerFactory;
-  private final boolean hasStreamingSideInput;
-  private final OutputsPerElementTracker outputsPerElementTracker;
-  private final DoFnSchemaInformation doFnSchemaInformation;
-  private final Map<String, PCollectionView<?>> sideInputMapping;
-
-  // Various DoFn helpers, null between bundles
-  private @Nullable DoFnRunner<InputT, OutputT> fnRunner;
-  @Nullable DoFnInfo<InputT, OutputT> fnInfo;
-  private Receiver @Nullable [] receivers;
-
-  // This may additionally be null if it is not a real DoFn but an OldDoFn or
-  // GroupAlsoByWindowViaWindowSetDoFn
-  private @Nullable DoFnSignature fnSignature;
+public class SimpleParDoFn<InputT, OutputT, W extends BoundedWindow> 
implements ParDoFn {
+  private final SimpleParDoFnHelpers<InputT, OutputT, W> helpers;
+  private @Nullable StreamingSideInputProcessor<InputT, W> sideInputProcessor;
 
   /** Creates a {@link SimpleParDoFn} using basic information about the step 
being executed. */
   SimpleParDoFn(
@@ -124,225 +65,106 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
       DoFnSchemaInformation doFnSchemaInformation,
       Map<String, PCollectionView<?>> sideInputMapping,
       DoFnRunnerFactory runnerFactory) {
-    this.options = options;
-    this.doFnInstanceManager = doFnInstanceManager;
-
-    // We vend a freshly deserialized version for each run
-    this.sideInputReader = sideInputReader;
-    this.operationContext = operationContext;
-    checkArgument(!outputTupleTagsToReceiverIndices.isEmpty(), "expected at 
least one output");
-    this.mainOutputTag = mainOutputTag;
-    this.outputTupleTagsToReceiverIndices = outputTupleTagsToReceiverIndices;
-    ImmutableList.Builder<TupleTag<?>> sideOutputTagsBuilder = 
ImmutableList.builder();
-    for (TupleTag<?> tag : outputTupleTagsToReceiverIndices.keySet()) {
-      if (!mainOutputTag.equals(tag)) {
-        sideOutputTagsBuilder.add(tag);
-      }
-    }
-    this.sideOutputTags = sideOutputTagsBuilder.build();
-    this.stepContext = stepContext;
-
-    // StepContext provides a TimerInternals and StateInternals for use by the 
system - this class.
-    // For the user, we request a user-scoped StepContext to provide a 
user-scoped
-    // StateInternals and TimerInternals.
-    this.userStepContext = stepContext.namespacedToUser();
-
-    this.counterFactory = operationContext.counterFactory();
-    this.runnerFactory = runnerFactory;
-    this.hasStreamingSideInput =
-        options.as(StreamingOptions.class).isStreaming() && 
!sideInputReader.isEmpty();
-    this.outputsPerElementTracker = createOutputsPerElementTracker();
-    this.doFnSchemaInformation = doFnSchemaInformation;
-    this.sideInputMapping = sideInputMapping;
-  }
-
-  private OutputsPerElementTracker createOutputsPerElementTracker() {
-    // TODO: Remove once Distributions has shipped.
-    if (!hasExperiment(OUTPUTS_PER_ELEMENT_EXPERIMENT)) {
-      return NoopOutputsPerElementTracker.INSTANCE;
-    }
-
-    // TODO: Remove log statement when functionality is enabled by default.
-    LOG.info("{} counter enabled.", COUNTER_NAME);
-
-    return new OutputsPerElementTrackerImpl();
-  }
-
-  private boolean hasExperiment(String experiment) {
-    List<String> experiments = 
options.as(DataflowPipelineDebugOptions.class).getExperiments();
-    return experiments != null && experiments.contains(experiment);
-  }
-
-  /** Simple state tracker to calculate PerElementOutputCount counter. */
-  private interface OutputsPerElementTracker {
-
-    void onOutput();
-
-    void onProcessElement();
-
-    void onProcessElementSuccess();
-  }
-
-  private class OutputsPerElementTrackerImpl implements 
OutputsPerElementTracker {
-
-    private long outputsPerElement;
-    private final Counter<Long, CounterFactory.CounterDistribution> counter;
-
-    public OutputsPerElementTrackerImpl() {
-      this.counter =
-          counterFactory.distribution(
-              
CounterName.named(COUNTER_NAME).withOriginalName(stepContext.getNameContext()));
-    }
-
-    @Override
-    public void onProcessElement() {
-      reset();
-    }
-
-    @Override
-    public void onOutput() {
-      outputsPerElement++;
-    }
-
-    @Override
-    public void onProcessElementSuccess() {
-      counter.addValue(outputsPerElement);
-      reset();
-    }
-
-    private void reset() {
-      outputsPerElement = 0L;
-    }
-  }
-
-  /** No-op {@link OutputsPerElementTracker} implementation used when the 
counter is disabled. */
-  private static class NoopOutputsPerElementTracker implements 
OutputsPerElementTracker {
-
-    private NoopOutputsPerElementTracker() {}
-
-    public static final OutputsPerElementTracker INSTANCE = new 
NoopOutputsPerElementTracker();
-
-    @Override
-    public void onOutput() {}
-
-    @Override
-    public void onProcessElement() {}
-
-    @Override
-    public void onProcessElementSuccess() {}
+    this.helpers =
+        new SimpleParDoFnHelpers<>(
+            options,
+            doFnInstanceManager,
+            sideInputReader,
+            mainOutputTag,
+            outputTupleTagsToReceiverIndices,
+            stepContext,
+            operationContext,
+            doFnSchemaInformation,
+            sideInputMapping,
+            runnerFactory);
   }
 
   @Override
   public void startBundle(Receiver... receivers) throws Exception {
-    checkArgument(
-        receivers.length == outputTupleTagsToReceiverIndices.size(),
-        "unexpected number of receivers for DoFn");
-
-    this.receivers = receivers;
-    if (hasStreamingSideInput) {
+    helpers.startBundle(receivers);
+    if (helpers.hasStreamingSideInput) {
       // There is non-trivial setup that needs to be performed for watermark 
propagation
       // even on empty bundles.
-      reallyStartBundle();
-    }
-  }
-
-  private void reallyStartBundle() throws Exception {
-    checkState(fnRunner == null, "bundle already started (or not properly 
finished)");
-
-    WindowedValueMultiReceiver outputManager =
-        new WindowedValueMultiReceiver() {
-          final Map<TupleTag<?>, OutputReceiver> undeclaredOutputs = new 
HashMap<>();
-
-          private @Nullable Receiver getReceiverOrNull(TupleTag<?> tag) {
-            Integer receiverIndex = outputTupleTagsToReceiverIndices.get(tag);
-            if (receiverIndex != null) {
-              return receivers[receiverIndex];
-            } else {
-              return undeclaredOutputs.get(tag);
-            }
-          }
-
-          @Override
-          public <TagT> void output(TupleTag<TagT> tag, WindowedValue<TagT> 
output) {
-            outputsPerElementTracker.onOutput();
-            Receiver receiver = getReceiverOrNull(tag);
-            if (receiver == null) {
-              // A new undeclared output.
-              // TODO: plumb through the operationName, so that we can
-              // name implicit outputs after it.
-              String outputName = "implicit-" + tag.getId();
-              // TODO: plumb through the counter prefix, so we can
-              // make it available to the OutputReceiver class in case
-              // it wants to use it in naming output counterFactory.  (It
-              // doesn't today.)
-              OutputReceiver undeclaredReceiver = new OutputReceiver();
-
-              ElementCounter outputCounter =
-                  new DataflowOutputCounter(
-                      outputName, counterFactory, 
stepContext.getNameContext());
-              undeclaredReceiver.addOutputCounter(outputCounter);
-              undeclaredOutputs.put(tag, undeclaredReceiver);
-              receiver = undeclaredReceiver;
+      helpers.reallyStartBundle();
+      onStartKey();
+    }
+  }
+
+  protected void onStartKey() {
+    // TODO(relax): This assumes single-key bundles, which will change! 
Refactor this to not make
+    // this assumption.
+    if (helpers.hasStreamingSideInput) {
+      sideInputProcessor =
+          new StreamingSideInputProcessor<>(
+              new StreamingSideInputFetcher<InputT, W>(
+                  helpers.fnInfo.getSideInputViews(),
+                  helpers.fnInfo.getInputCoder(),
+                  (WindowingStrategy<?, W>) 
helpers.fnInfo.getWindowingStrategy(),
+                  (StreamingModeExecutionContext.StreamingModeStepContext)
+                      helpers.userStepContext));
+
+      boolean hasState = helpers.hasState();
+      sideInputProcessor.tryUnblockElements(
+          unblockedElements -> {
+            for (WindowedValue<InputT> unblockedElement : unblockedElements) {
+              helpers.fnRunner.processElement(unblockedElement);
+              if (hasState) {
+                // These elements are now processed. Register cleanup timers 
for all the unblocked
+                // windows.
+                helpers.registerStateCleanup(
+                    (WindowingStrategy<?, W>) 
getDoFnInfo().getWindowingStrategy(),
+                    (Collection<W>) unblockedElement.getWindows());
+              }
             }
-
-            try {
-              receiver.process(output);
-            } catch (RuntimeException | Error e) {
-              // Rethrow unchecked exceptions as-is to avoid excessive nesting
-              // via a chain of DoFn's.
-              throw e;
-            } catch (Exception e) {
-              // This should never happen in practice with DoFn's, but can 
happen
-              // with other Receivers.
-              throw new RuntimeException(e);
-            }
-          }
-        };
-    fnInfo = (DoFnInfo) doFnInstanceManager.get();
-    fnSignature = DoFnSignatures.getSignature(fnInfo.getDoFn().getClass());
-
-    fnRunner =
-        runnerFactory.createRunner(
-            fnInfo.getDoFn(),
-            options,
-            mainOutputTag,
-            sideOutputTags,
-            fnInfo.getSideInputViews(),
-            sideInputReader,
-            fnInfo.getInputCoder(),
-            fnInfo.getOutputCoders(),
-            fnInfo.getWindowingStrategy(),
-            stepContext,
-            userStepContext,
-            outputManager,
-            doFnSchemaInformation,
-            sideInputMapping);
-
-    fnRunner.startBundle();
+          });
+    }
   }
 
   @Override
   @SuppressWarnings("unchecked")
   public void processElement(Object untypedElem) throws Exception {
-    if (fnRunner == null) {
+    if (helpers.fnRunner == null) {
       // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
       // sampler into the start state.
-      try (Closeable start = operationContext.enterStart()) {
-        reallyStartBundle();
+      try (Closeable start = helpers.operationContext.enterStart()) {
+        helpers.reallyStartBundle();
+        onStartKey();
       }
     }
+    helpers.outputsPerElementTracker.onProcessElement();
 
     WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
-
-    if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
-      registerStateCleanup(
-          (WindowingStrategy<?, BoundedWindow>) 
getDoFnInfo().getWindowingStrategy(),
-          (Collection<BoundedWindow>) elem.getWindows());
+    onProcessWindowedValue(elem);
+
+    helpers.outputsPerElementTracker.onProcessElementSuccess();
+  }
+
+  protected void onProcessWindowedValue(WindowedValue<InputT> elem) {
+    boolean hasState = helpers.hasState();
+
+    Collection<W> windowsProcessed;
+    if (sideInputProcessor != null) {
+      windowsProcessed = hasState ? Lists.newArrayList() : 
Collections.emptyList();
+      for (Iterator<? extends WindowedValue<InputT>> it =
+              sideInputProcessor.handleProcessElement(elem);
+          it.hasNext(); ) {
+        WindowedValue<InputT> toProcess = it.next();
+        helpers.fnRunner.processElement(toProcess);
+        if (hasState) {
+          windowsProcessed.addAll((Collection<W>) toProcess.getWindows());
+          // If the element was blocked, don't register a cleanup timer. The 
timer will be
+          // registered
+          // when the window is unblocked ensuring that it is not processed 
until the element is.
+        }
+      }
+    } else {
+      helpers.fnRunner.processElement(elem);
+      windowsProcessed = (Collection<W>) elem.getWindows();
+    }
+    if (hasState) {
+      helpers.registerStateCleanup(
+          (WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(), 
windowsProcessed);
     }
-
-    outputsPerElementTracker.onProcessElement();
-    fnRunner.processElement(elem);
-    outputsPerElementTracker.onProcessElementSuccess();
   }
 
   @Override
@@ -355,180 +177,33 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
     // exist without actually decoding them.
     Coder<BoundedWindow> windowCoder =
         (Coder<BoundedWindow>)
-            (fnInfo != null ? fnInfo : doFnInstanceManager.peek())
+            (helpers.fnInfo != null ? helpers.fnInfo : 
helpers.doFnInstanceManager.peek())
                 .getWindowingStrategy()
                 .getWindowFn()
                 .windowCoder();
-    processTimers(TimerType.USER, userStepContext, windowCoder);
-    processTimers(TimerType.SYSTEM, stepContext, windowCoder);
-  }
-
-  private void processUserTimer(TimerData timer) throws Exception {
-    if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
-        || 
fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
-      BoundedWindow window = ((WindowNamespace) 
timer.getNamespace()).getWindow();
-      fnRunner.onTimer(
-          timer.getTimerId(),
-          timer.getTimerFamilyId(),
-          this.stepContext.stateInternals().getKey(),
-          window,
-          timer.getTimestamp(),
-          timer.getOutputTimestamp(),
-          timer.getDomain(),
-          timer.causedByDrain());
-    }
-  }
-
-  private void processSystemTimer(TimerData timer) throws Exception {
-
-    // Timer owned by this class, for cleaning up state in expired windows
-    if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) {
-      checkState(
-          timer.getDomain().equals(TimeDomain.EVENT_TIME),
-          "%s received cleanup timer with domain not EVENT_TIME: %s",
-          this,
-          timer);
-
-      checkState(
-          timer.getNamespace() instanceof WindowNamespace,
-          "%s received cleanup timer not for a %s: %s",
-          this,
-          WindowNamespace.class.getSimpleName(),
-          timer);
-
-      BoundedWindow window = ((WindowNamespace) 
timer.getNamespace()).getWindow();
-      Instant targetTime = earliestAllowableCleanupTime(window, 
fnInfo.getWindowingStrategy());
-
-      checkState(
-          !targetTime.isAfter(timer.getTimestamp()),
-          "%s received state cleanup timer for window %s "
-              + " that is before the appropriate cleanup time %s",
-          this,
-          window,
-          targetTime);
-
-      fnRunner.onWindowExpiration(
-          window, timer.getOutputTimestamp(), 
this.stepContext.stateInternals().getKey());
-
-      // This is for a timer for a window that is expired, so clean it up.
-      for (StateDeclaration stateDecl : 
fnSignature.stateDeclarations().values()) {
-        StateTag<?> tag;
-        try {
-          tag =
-              StateTags.tagForSpec(
-                  stateDecl.id(), (StateSpec) 
stateDecl.field().get(fnInfo.getDoFn()));
-        } catch (IllegalAccessException e) {
-          throw new RuntimeException(
-              String.format(
-                  "Error accessing %s for %s",
-                  StateSpec.class.getName(), 
fnInfo.getDoFn().getClass().getName()),
-              e);
-        }
-
-        StateInternals stateInternals = userStepContext.stateInternals();
-        org.apache.beam.sdk.state.State state = 
stateInternals.state(timer.getNamespace(), tag);
-        state.clear();
-      }
-    }
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.USER,
+        helpers.userStepContext,
+        windowCoder,
+        this::onStartKey,
+        () -> sideInputProcessor);
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.SYSTEM,
+        helpers.stepContext,
+        windowCoder,
+        this::onStartKey,
+        () -> sideInputProcessor);
   }
 
   @Override
   public void finishBundle() throws Exception {
-    if (fnRunner != null) {
-      fnRunner.finishBundle();
-      doFnInstanceManager.complete(fnInfo);
-      fnRunner = null;
-      fnInfo = null;
-      fnSignature = null;
-    }
+    helpers.finishBundle(sideInputProcessor);
+    this.sideInputProcessor = null;
   }
 
   @Override
   public void abort() throws Exception {
-    doFnInstanceManager.abort(fnInfo);
-    fnRunner = null;
-    fnInfo = null;
-  }
-
-  @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
-
-  private enum TimerType {
-    USER {
-      @Override
-      public void processTimer(SimpleParDoFn doFn, TimerData timer) throws 
Exception {
-        doFn.processUserTimer(timer);
-      }
-    },
-    SYSTEM {
-      @Override
-      public void processTimer(SimpleParDoFn doFn, TimerData timer) throws 
Exception {
-        doFn.processSystemTimer(timer);
-      }
-    };
-
-    public abstract void processTimer(SimpleParDoFn doFn, TimerData timer) 
throws Exception;
-  };
-
-  private void processTimers(
-      TimerType mode,
-      DataflowExecutionContext.DataflowStepContext context,
-      Coder<BoundedWindow> windowCoder)
-      throws Exception {
-    TimerData timer = context.getNextFiredTimer(windowCoder);
-
-    if (timer != null && fnRunner == null) {
-      // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
-      // sampler into the start state.
-      try (Closeable start = operationContext.enterStart()) {
-        reallyStartBundle();
-      }
-    }
-
-    while (timer != null) {
-      mode.processTimer(this, timer);
-      timer = context.getNextFiredTimer(windowCoder);
-    }
-  }
-
-  private <W extends BoundedWindow> void registerStateCleanup(
-      WindowingStrategy<?, W> windowingStrategy, Collection<W> 
windowsToCleanup) {
-    Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder();
-
-    for (W window : windowsToCleanup) {
-      // The stepContext is the thing that know if it is batch or streaming, 
hence
-      // whether state needs to be cleaned up or will simply be discarded so 
the
-      // timer can be ignored.
-      Instant cleanupTime = earliestAllowableCleanupTime(window, 
windowingStrategy);
-      // Set a cleanup timer for state at the end of the window to trigger 
onWindowExpiration and
-      // garbage collect state. We avoid doing this for the global window if 
there is no window
-      // expiration set as the state will be up when the pipeline terminates. 
Setting the timer
-      // leads to a unbounded growth of timers for pipelines with many unique 
keys in the global
-      // window.
-      if (cleanupTime.isBefore(GlobalWindow.INSTANCE.maxTimestamp())
-          || fnSignature.onWindowExpiration() != null) {
-        // If the DoFn has OnWindowExpiration, then set the watermark hold so 
that the watermark
-        // does
-        // not advance until OnWindowExpiration completes.
-        Instant cleanupOutputTimestamp =
-            fnSignature.onWindowExpiration() == null
-                ? cleanupTime
-                : cleanupTime.minus(Duration.millis(1L));
-        stepContext.setStateCleanupTimer(
-            CLEANUP_TIMER_ID, window, windowCoder, cleanupTime, 
cleanupOutputTimestamp);
-      }
-    }
-  }
-
-  private Instant earliestAllowableCleanupTime(
-      BoundedWindow window, WindowingStrategy windowingStrategy) {
-    Instant cleanupTime =
-        window
-            .maxTimestamp()
-            .plus(windowingStrategy.getAllowedLateness())
-            .plus(Duration.millis(1L));
-    return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
-        ? BoundedWindow.TIMESTAMP_MAX_VALUE
-        : cleanupTime;
+    helpers.abort();
   }
 
   /**
@@ -540,6 +215,6 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
   @VisibleForTesting
   @Nullable
   DoFnInfo<?, ?> getDoFnInfo() {
-    return fnInfo;
+    return helpers.fnInfo;
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java
similarity index 76%
copy from 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
copy to 
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java
index 434d46c20a5..964cf2323d5 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpers.java
@@ -25,29 +25,29 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.worker.counters.Counter;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter;
 import 
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver;
-import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.StateDeclaration;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -65,18 +65,12 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * A base class providing simple set up, processing, and tear down for a 
wrapped {@link
- * GroupAlsoByWindowFn}.
- *
- * <p>Subclasses override just a method to provide a {@link DoFnInfo} for the 
wrapped {@link
- * GroupAlsoByWindowFn}.
- */
 @SuppressWarnings({
   "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
-public class SimpleParDoFn<InputT, OutputT> implements ParDoFn {
+class SimpleParDoFnHelpers<InputT, OutputT, W extends BoundedWindow> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleParDoFnHelpers.class);
 
   // TODO: Remove once Distributions has shipped.
   @VisibleForTesting
@@ -84,36 +78,33 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
 
   private static final String COUNTER_NAME = "per-element-output-count";
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(SimpleParDoFn.class);
-
-  protected final PipelineOptions options;
-  private final DoFnInstanceManager doFnInstanceManager;
+  final PipelineOptions options;
+  final DoFnInstanceManager doFnInstanceManager;
 
   private final SideInputReader sideInputReader;
-  private final DataflowOperationContext operationContext;
+  final DataflowOperationContext operationContext;
   private final TupleTag<OutputT> mainOutputTag;
   private final Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices;
   private final List<TupleTag<?>> sideOutputTags;
-  private final DataflowExecutionContext.DataflowStepContext stepContext;
-  private final DataflowExecutionContext.DataflowStepContext userStepContext;
+  final DataflowExecutionContext.DataflowStepContext stepContext;
+  final DataflowExecutionContext.DataflowStepContext userStepContext;
   private final CounterFactory counterFactory;
   private final DoFnRunnerFactory runnerFactory;
-  private final boolean hasStreamingSideInput;
-  private final OutputsPerElementTracker outputsPerElementTracker;
+  final boolean hasStreamingSideInput;
+  final OutputsPerElementTracker outputsPerElementTracker;
   private final DoFnSchemaInformation doFnSchemaInformation;
   private final Map<String, PCollectionView<?>> sideInputMapping;
 
   // Various DoFn helpers, null between bundles
-  private @Nullable DoFnRunner<InputT, OutputT> fnRunner;
+  @Nullable DoFnRunner<InputT, OutputT> fnRunner;
   @Nullable DoFnInfo<InputT, OutputT> fnInfo;
   private Receiver @Nullable [] receivers;
 
   // This may additionally be null if it is not a real DoFn but an OldDoFn or
   // GroupAlsoByWindowViaWindowSetDoFn
-  private @Nullable DoFnSignature fnSignature;
+  protected @Nullable DoFnSignature fnSignature;
 
-  /** Creates a {@link SimpleParDoFn} using basic information about the step 
being executed. */
-  SimpleParDoFn(
+  SimpleParDoFnHelpers(
       PipelineOptions options,
       DoFnInstanceManager doFnInstanceManager,
       SideInputReader sideInputReader,
@@ -156,97 +147,19 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
     this.sideInputMapping = sideInputMapping;
   }
 
-  private OutputsPerElementTracker createOutputsPerElementTracker() {
-    // TODO: Remove once Distributions has shipped.
-    if (!hasExperiment(OUTPUTS_PER_ELEMENT_EXPERIMENT)) {
-      return NoopOutputsPerElementTracker.INSTANCE;
-    }
-
-    // TODO: Remove log statement when functionality is enabled by default.
-    LOG.info("{} counter enabled.", COUNTER_NAME);
-
-    return new OutputsPerElementTrackerImpl();
-  }
-
-  private boolean hasExperiment(String experiment) {
-    List<String> experiments = 
options.as(DataflowPipelineDebugOptions.class).getExperiments();
-    return experiments != null && experiments.contains(experiment);
-  }
-
-  /** Simple state tracker to calculate PerElementOutputCount counter. */
-  private interface OutputsPerElementTracker {
-
-    void onOutput();
-
-    void onProcessElement();
-
-    void onProcessElementSuccess();
-  }
-
-  private class OutputsPerElementTrackerImpl implements 
OutputsPerElementTracker {
-
-    private long outputsPerElement;
-    private final Counter<Long, CounterFactory.CounterDistribution> counter;
-
-    public OutputsPerElementTrackerImpl() {
-      this.counter =
-          counterFactory.distribution(
-              
CounterName.named(COUNTER_NAME).withOriginalName(stepContext.getNameContext()));
-    }
-
-    @Override
-    public void onProcessElement() {
-      reset();
-    }
-
-    @Override
-    public void onOutput() {
-      outputsPerElement++;
-    }
-
-    @Override
-    public void onProcessElementSuccess() {
-      counter.addValue(outputsPerElement);
-      reset();
-    }
-
-    private void reset() {
-      outputsPerElement = 0L;
-    }
-  }
-
-  /** No-op {@link OutputsPerElementTracker} implementation used when the 
counter is disabled. */
-  private static class NoopOutputsPerElementTracker implements 
OutputsPerElementTracker {
-
-    private NoopOutputsPerElementTracker() {}
-
-    public static final OutputsPerElementTracker INSTANCE = new 
NoopOutputsPerElementTracker();
-
-    @Override
-    public void onOutput() {}
-
-    @Override
-    public void onProcessElement() {}
-
-    @Override
-    public void onProcessElementSuccess() {}
+  boolean hasState() {
+    return fnSignature != null && !fnSignature.stateDeclarations().isEmpty();
   }
 
-  @Override
-  public void startBundle(Receiver... receivers) throws Exception {
+  void startBundle(Receiver... receivers) throws Exception {
     checkArgument(
         receivers.length == outputTupleTagsToReceiverIndices.size(),
         "unexpected number of receivers for DoFn");
 
     this.receivers = receivers;
-    if (hasStreamingSideInput) {
-      // There is non-trivial setup that needs to be performed for watermark 
propagation
-      // even on empty bundles.
-      reallyStartBundle();
-    }
   }
 
-  private void reallyStartBundle() throws Exception {
+  void reallyStartBundle() throws Exception {
     checkState(fnRunner == null, "bundle already started (or not properly 
finished)");
 
     WindowedValueMultiReceiver outputManager =
@@ -317,56 +230,103 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
             outputManager,
             doFnSchemaInformation,
             sideInputMapping);
-
     fnRunner.startBundle();
   }
 
-  @Override
-  @SuppressWarnings("unchecked")
-  public void processElement(Object untypedElem) throws Exception {
-    if (fnRunner == null) {
+  void finishBundle(StreamingSideInputProcessor<?, ?> sideInputProcessor) 
throws Exception {
+    if (fnRunner != null) {
+      fnRunner.finishBundle();
+      if (sideInputProcessor != null) {
+        sideInputProcessor.handleFinishBundle();
+      }
+      doFnInstanceManager.complete(fnInfo);
+      fnRunner = null;
+      fnInfo = null;
+      fnSignature = null;
+      sideInputProcessor = null;
+    }
+  }
+
+  void abort() throws Exception {
+    doFnInstanceManager.abort(fnInfo);
+    fnRunner = null;
+    fnInfo = null;
+  }
+
+  @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
+
+  enum TimerType {
+    USER {
+      @Override
+      public void processTimer(
+          SimpleParDoFnHelpers doFn,
+          TimerInternals.TimerData timer,
+          Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+          throws Exception {
+        doFn.processUserTimer(timer, sideInputProcessor.get());
+      }
+    },
+    FAIL_USER {
+      @Override
+      public void processTimer(
+          SimpleParDoFnHelpers doFn,
+          TimerInternals.TimerData timer,
+          Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+          throws Exception {
+        throw new UnsupportedOperationException(
+            "Attempt to deliver a timer to a DoFn, but timers are not 
supported here.");
+      }
+    },
+    SYSTEM {
+      @Override
+      public void processTimer(
+          SimpleParDoFnHelpers doFn,
+          TimerInternals.TimerData timer,
+          Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+          throws Exception {
+        doFn.processSystemTimer(timer, sideInputProcessor.get());
+      }
+    };
+
+    public abstract void processTimer(
+        SimpleParDoFnHelpers doFn,
+        TimerInternals.TimerData timer,
+        Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+        throws Exception;
+  };
+
+  void processTimers(
+      TimerType mode,
+      DataflowExecutionContext.DataflowStepContext context,
+      Coder<BoundedWindow> windowCoder,
+      Runnable startKey,
+      Supplier<StreamingSideInputProcessor<?, ?>> sideInputProcessor)
+      throws Exception {
+    TimerInternals.TimerData timer = context.getNextFiredTimer(windowCoder);
+
+    if (timer != null && fnRunner == null) {
       // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
       // sampler into the start state.
       try (Closeable start = operationContext.enterStart()) {
         reallyStartBundle();
+        startKey.run();
       }
     }
 
-    WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
-
-    if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
-      registerStateCleanup(
-          (WindowingStrategy<?, BoundedWindow>) 
getDoFnInfo().getWindowingStrategy(),
-          (Collection<BoundedWindow>) elem.getWindows());
+    while (timer != null) {
+      mode.processTimer(this, timer, sideInputProcessor);
+      timer = context.getNextFiredTimer(windowCoder);
     }
-
-    outputsPerElementTracker.onProcessElement();
-    fnRunner.processElement(elem);
-    outputsPerElementTracker.onProcessElementSuccess();
-  }
-
-  @Override
-  public void processTimers() throws Exception {
-
-    // Note: We need to get windowCoder to decode the timers.  If we haven't 
already deserialized
-    // the fnInfo, we peek at a new instance to retrieve that. If this extra 
deserialization becomes
-    // excessively costly, we could either (1) have the DoFnInstanceManager 
remember the associated
-    // windowCoder (allowing us to get it without a DoFnInfo instance) or (2) 
check whether timers
-    // exist without actually decoding them.
-    Coder<BoundedWindow> windowCoder =
-        (Coder<BoundedWindow>)
-            (fnInfo != null ? fnInfo : doFnInstanceManager.peek())
-                .getWindowingStrategy()
-                .getWindowFn()
-                .windowCoder();
-    processTimers(TimerType.USER, userStepContext, windowCoder);
-    processTimers(TimerType.SYSTEM, stepContext, windowCoder);
   }
 
-  private void processUserTimer(TimerData timer) throws Exception {
+  protected void processUserTimer(
+      TimerInternals.TimerData timer, StreamingSideInputProcessor<InputT, W> 
sideInputProcessor) {
     if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())
         || 
fnSignature.timerFamilyDeclarations().containsKey(timer.getTimerFamilyId())) {
-      BoundedWindow window = ((WindowNamespace) 
timer.getNamespace()).getWindow();
+      BoundedWindow window = ((StateNamespaces.WindowNamespace) 
timer.getNamespace()).getWindow();
+      if (sideInputProcessor != null) {
+        sideInputProcessor.handleProcessTimer(timer);
+      }
       fnRunner.onTimer(
           timer.getTimerId(),
           timer.getTimerFamilyId(),
@@ -379,8 +339,9 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
     }
   }
 
-  private void processSystemTimer(TimerData timer) throws Exception {
-
+  private void processSystemTimer(
+      TimerInternals.TimerData timer, StreamingSideInputProcessor<InputT, W> 
sideInputProcessor)
+      throws Exception {
     // Timer owned by this class, for cleaning up state in expired windows
     if (timer.getTimerId().equals(CLEANUP_TIMER_ID)) {
       checkState(
@@ -390,13 +351,20 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
           timer);
 
       checkState(
-          timer.getNamespace() instanceof WindowNamespace,
+          timer.getNamespace() instanceof StateNamespaces.WindowNamespace,
           "%s received cleanup timer not for a %s: %s",
           this,
-          WindowNamespace.class.getSimpleName(),
+          StateNamespaces.WindowNamespace.class.getSimpleName(),
           timer);
 
-      BoundedWindow window = ((WindowNamespace) 
timer.getNamespace()).getWindow();
+      if (sideInputProcessor != null) {
+        // We must call this to ensure the side-input is cached for 
onWindowExpiration. Since we
+        // don't set cleanup
+        // timers until we actually call processElement, the window must be 
unblocked here.
+        sideInputProcessor.handleProcessTimer(timer);
+      }
+
+      BoundedWindow window = ((StateNamespaces.WindowNamespace) 
timer.getNamespace()).getWindow();
       Instant targetTime = earliestAllowableCleanupTime(window, 
fnInfo.getWindowingStrategy());
 
       checkState(
@@ -411,7 +379,7 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
           window, timer.getOutputTimestamp(), 
this.stepContext.stateInternals().getKey());
 
       // This is for a timer for a window that is expired, so clean it up.
-      for (StateDeclaration stateDecl : 
fnSignature.stateDeclarations().values()) {
+      for (DoFnSignature.StateDeclaration stateDecl : 
fnSignature.stateDeclarations().values()) {
         StateTag<?> tag;
         try {
           tag =
@@ -426,71 +394,100 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
         }
 
         StateInternals stateInternals = userStepContext.stateInternals();
-        org.apache.beam.sdk.state.State state = 
stateInternals.state(timer.getNamespace(), tag);
+        State state = stateInternals.state(timer.getNamespace(), tag);
         state.clear();
       }
     }
   }
 
-  @Override
-  public void finishBundle() throws Exception {
-    if (fnRunner != null) {
-      fnRunner.finishBundle();
-      doFnInstanceManager.complete(fnInfo);
-      fnRunner = null;
-      fnInfo = null;
-      fnSignature = null;
+  private OutputsPerElementTracker createOutputsPerElementTracker() {
+    // TODO: Remove once Distributions has shipped.
+    if (!hasExperiment(OUTPUTS_PER_ELEMENT_EXPERIMENT)) {
+      return NoopOutputsPerElementTracker.INSTANCE;
     }
+
+    // TODO: Remove log statement when functionality is enabled by default.
+    LOG.info("{} counter enabled.", COUNTER_NAME);
+
+    return new OutputsPerElementTrackerImpl();
   }
 
-  @Override
-  public void abort() throws Exception {
-    doFnInstanceManager.abort(fnInfo);
-    fnRunner = null;
-    fnInfo = null;
+  private boolean hasExperiment(String experiment) {
+    List<String> experiments = 
options.as(DataflowPipelineDebugOptions.class).getExperiments();
+    return experiments != null && experiments.contains(experiment);
   }
 
-  @VisibleForTesting static final String CLEANUP_TIMER_ID = "cleanup-timer";
+  /** Simple state tracker to calculate PerElementOutputCount counter. */
+  interface OutputsPerElementTracker {
 
-  private enum TimerType {
-    USER {
-      @Override
-      public void processTimer(SimpleParDoFn doFn, TimerData timer) throws 
Exception {
-        doFn.processUserTimer(timer);
-      }
-    },
-    SYSTEM {
-      @Override
-      public void processTimer(SimpleParDoFn doFn, TimerData timer) throws 
Exception {
-        doFn.processSystemTimer(timer);
-      }
-    };
+    void onOutput();
 
-    public abstract void processTimer(SimpleParDoFn doFn, TimerData timer) 
throws Exception;
-  };
+    void onProcessElement();
 
-  private void processTimers(
-      TimerType mode,
-      DataflowExecutionContext.DataflowStepContext context,
-      Coder<BoundedWindow> windowCoder)
-      throws Exception {
-    TimerData timer = context.getNextFiredTimer(windowCoder);
+    void onProcessElementSuccess();
+  }
 
-    if (timer != null && fnRunner == null) {
-      // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
-      // sampler into the start state.
-      try (Closeable start = operationContext.enterStart()) {
-        reallyStartBundle();
-      }
+  private class OutputsPerElementTrackerImpl implements 
OutputsPerElementTracker {
+
+    private long outputsPerElement;
+    private final Counter<Long, CounterFactory.CounterDistribution> counter;
+
+    public OutputsPerElementTrackerImpl() {
+      this.counter =
+          counterFactory.distribution(
+              
CounterName.named(COUNTER_NAME).withOriginalName(stepContext.getNameContext()));
     }
 
-    while (timer != null) {
-      mode.processTimer(this, timer);
-      timer = context.getNextFiredTimer(windowCoder);
+    @Override
+    public void onProcessElement() {
+      reset();
+    }
+
+    @Override
+    public void onOutput() {
+      outputsPerElement++;
+    }
+
+    @Override
+    public void onProcessElementSuccess() {
+      counter.addValue(outputsPerElement);
+      reset();
+    }
+
+    private void reset() {
+      outputsPerElement = 0L;
     }
   }
 
-  private <W extends BoundedWindow> void registerStateCleanup(
+  /** No-op {@link OutputsPerElementTracker} implementation used when the 
counter is disabled. */
+  private static class NoopOutputsPerElementTracker implements 
OutputsPerElementTracker {
+
+    private NoopOutputsPerElementTracker() {}
+
+    public static final OutputsPerElementTracker INSTANCE = new 
NoopOutputsPerElementTracker();
+
+    @Override
+    public void onOutput() {}
+
+    @Override
+    public void onProcessElement() {}
+
+    @Override
+    public void onProcessElementSuccess() {}
+  }
+
+  Instant earliestAllowableCleanupTime(BoundedWindow window, WindowingStrategy 
windowingStrategy) {
+    Instant cleanupTime =
+        window
+            .maxTimestamp()
+            .plus(windowingStrategy.getAllowedLateness())
+            .plus(Duration.millis(1L));
+    return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
+        ? BoundedWindow.TIMESTAMP_MAX_VALUE
+        : cleanupTime;
+  }
+
+  protected void registerStateCleanup(
       WindowingStrategy<?, W> windowingStrategy, Collection<W> 
windowsToCleanup) {
     Coder<W> windowCoder = windowingStrategy.getWindowFn().windowCoder();
 
@@ -518,28 +515,4 @@ public class SimpleParDoFn<InputT, OutputT> implements 
ParDoFn {
       }
     }
   }
-
-  private Instant earliestAllowableCleanupTime(
-      BoundedWindow window, WindowingStrategy windowingStrategy) {
-    Instant cleanupTime =
-        window
-            .maxTimestamp()
-            .plus(windowingStrategy.getAllowedLateness())
-            .plus(Duration.millis(1L));
-    return cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)
-        ? BoundedWindow.TIMESTAMP_MAX_VALUE
-        : cleanupTime;
-  }
-
-  /**
-   * Returns the {@link DoFnInfo} currently being used by this {@link 
SimpleParDoFn}.
-   *
-   * <p>May be null if no element has been processed yet, or if the {@link 
SimpleParDoFn} has
-   * finished.
-   */
-  @VisibleForTesting
-  @Nullable
-  DoFnInfo<?, ?> getDoFnInfo() {
-    return fnInfo;
-  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
index 3ad443ee2a2..b55d73cf579 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.util.DoFnInfo;
@@ -65,7 +64,8 @@ import org.joda.time.Duration;
 })
 class SplittableProcessFnFactory {
   static final ParDoFnFactory createDefault() {
-    return new UserParDoFnFactory(new ProcessFnExtractor(), new 
SplittableDoFnRunnerFactory());
+    return new UserParDoFnFactory(
+        new ProcessFnExtractor(), new SplittableDoFnRunnerFactory(), true);
   }
 
   private static class ProcessFnExtractor implements 
UserParDoFnFactory.DoFnExtractor {
@@ -174,22 +174,6 @@ class SplittableProcessFnFactory {
               sideInputMapping);
       DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> 
fnRunner =
           new DataflowProcessFnRunner<>(simpleRunner);
-      boolean hasStreamingSideInput =
-          options.as(StreamingOptions.class).isStreaming() && 
!sideInputReader.isEmpty();
-      KeyedWorkItemCoder<byte[], KV<InputT, RestrictionT>> kwiCoder =
-          (KeyedWorkItemCoder<byte[], KV<InputT, RestrictionT>>) inputCoder;
-      if (hasStreamingSideInput) {
-        fnRunner =
-            new StreamingKeyedWorkItemSideInputDoFnRunner<>(
-                fnRunner,
-                ByteArrayCoder.of(),
-                new StreamingSideInputFetcher<>(
-                    sideInputViews,
-                    kwiCoder.getElementCoder(),
-                    processFn.getInputWindowingStrategy(),
-                    (StreamingModeExecutionContext.StreamingModeStepContext) 
userStepContext),
-                userStepContext);
-      }
       return fnRunner;
     }
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
new file mode 100644
index 00000000000..225bc6af0ea
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFn.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import com.google.api.client.util.Lists;
+import com.google.common.collect.Iterables;
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.ValueState;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@SuppressWarnings({
+  "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+/* Similar to {@link SimpleParDoFn} but for splittable ProcessFns. */
+public class StreamingKeyedWorkItemSideInputParDoFn<K, InputT, OutputT, W 
extends BoundedWindow>
+    implements ParDoFn {
+  private final StateTag<ValueState<K>> keyAddr;
+  private final Coder<InputT> inputCoder;
+  private final SimpleParDoFnHelpers<KeyedWorkItem<K, InputT>, OutputT, W> 
helpers;
+  protected @Nullable StreamingSideInputProcessor<InputT, W> 
sideInputProcessor;
+
+  StreamingKeyedWorkItemSideInputParDoFn(
+      PipelineOptions options,
+      DoFnInstanceManager doFnInstanceManager,
+      SideInputReader sideInputReader,
+      TupleTag<OutputT> mainOutputTag,
+      Map<TupleTag<?>, Integer> outputTupleTagsToReceiverIndices,
+      DataflowExecutionContext.DataflowStepContext stepContext,
+      DataflowOperationContext operationContext,
+      DoFnSchemaInformation doFnSchemaInformation,
+      Map<String, PCollectionView<?>> sideInputMapping,
+      DoFnRunnerFactory runnerFactory,
+      Coder<K> keyCoder,
+      Coder<InputT> inputCoder) {
+    helpers =
+        new SimpleParDoFnHelpers<>(
+            options,
+            doFnInstanceManager,
+            sideInputReader,
+            mainOutputTag,
+            outputTupleTagsToReceiverIndices,
+            stepContext,
+            operationContext,
+            doFnSchemaInformation,
+            sideInputMapping,
+            runnerFactory);
+    this.keyAddr = StateTags.makeSystemTagInternal(StateTags.value("key", 
keyCoder));
+    this.inputCoder = inputCoder;
+  }
+
+  ValueState<K> keyValue() {
+    return 
helpers.stepContext.stateInternals().state(StateNamespaces.global(), keyAddr);
+  }
+
+  @Override
+  public void startBundle(Receiver... receivers) throws Exception {
+    helpers.startBundle(receivers);
+    if (helpers.hasStreamingSideInput) {
+      // There is non-trivial setup that needs to be performed for watermark 
propagation
+      // even on empty bundles.
+      helpers.reallyStartBundle();
+      onStartKey();
+    }
+  }
+
+  protected void onStartKey() {
+    if (helpers.hasStreamingSideInput) {
+      sideInputProcessor =
+          new StreamingSideInputProcessor<>(
+              new StreamingSideInputFetcher<InputT, W>(
+                  helpers.fnInfo.getSideInputViews(),
+                  inputCoder,
+                  (WindowingStrategy<?, W>) 
helpers.fnInfo.getWindowingStrategy(),
+                  (StreamingModeExecutionContext.StreamingModeStepContext)
+                      helpers.userStepContext));
+    }
+
+    if (sideInputProcessor != null) {
+      boolean hasState = helpers.hasState();
+
+      // TODO(relax): We should be able to get this without writing it to 
state!
+      @Nullable K key = keyValue().read();
+      if (key != null) {
+        sideInputProcessor.tryUnblockElementsAndTimers(
+            (unblockedElements, unblockedTimers) -> {
+              if (!Iterables.isEmpty(unblockedElements) || 
!Iterables.isEmpty(unblockedTimers)) {
+                helpers.fnRunner.processElement(
+                    new ValueInEmptyWindows<>(
+                        KeyedWorkItems.workItem(key, unblockedTimers, 
unblockedElements)));
+              }
+              if (hasState) {
+                List<W> windows =
+                    (List<W>)
+                        StreamSupport.stream(unblockedElements.spliterator(), 
false)
+                            .flatMap(wv -> wv.getWindows().stream())
+                            .collect(Collectors.toList());
+                // These elements are now processed. Register cleanup timers 
for all the unblocked
+                // windows.
+                helpers.registerStateCleanup(
+                    (WindowingStrategy<?, W>) 
getDoFnInfo().getWindowingStrategy(), windows);
+              }
+            });
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void processElement(Object untypedElem) throws Exception {
+    if (helpers.fnRunner == null) {
+      // If we need to run reallyStartBundle in here, we need to make sure to 
switch the state
+      // sampler into the start state.
+      try (Closeable start = helpers.operationContext.enterStart()) {
+        helpers.reallyStartBundle();
+        onStartKey();
+      }
+    }
+    helpers.outputsPerElementTracker.onProcessElement();
+
+    WindowedValue<KeyedWorkItem<K, InputT>> elem =
+        (WindowedValue<KeyedWorkItem<K, InputT>>) untypedElem;
+    onProcessWindowedValue(elem);
+
+    helpers.outputsPerElementTracker.onProcessElementSuccess();
+  }
+
+  @Override
+  public void processTimers() throws Exception {
+
+    // Note: We need to get windowCoder to decode the timers.  If we haven't 
already deserialized
+    // the fnInfo, we peek at a new instance to retrieve that. If this extra 
deserialization becomes
+    // excessively costly, we could either (1) have the DoFnInstanceManager 
remember the associated
+    // windowCoder (allowing us to get it without a DoFnInfo instance) or (2) 
check whether timers
+    // exist without actually decoding them.
+    Coder<BoundedWindow> windowCoder =
+        (Coder<BoundedWindow>)
+            (helpers.fnInfo != null ? helpers.fnInfo : 
helpers.doFnInstanceManager.peek())
+                .getWindowingStrategy()
+                .getWindowFn()
+                .windowCoder();
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.FAIL_USER,
+        helpers.userStepContext,
+        windowCoder,
+        this::onStartKey,
+        () -> sideInputProcessor);
+    helpers.processTimers(
+        SimpleParDoFnHelpers.TimerType.SYSTEM,
+        helpers.stepContext,
+        windowCoder,
+        this::onStartKey,
+        () -> sideInputProcessor);
+  }
+
+  @Override
+  public void finishBundle() throws Exception {
+    helpers.finishBundle(sideInputProcessor);
+    this.sideInputProcessor = null;
+  }
+
+  @Override
+  public void abort() throws Exception {
+    helpers.abort();
+  }
+
+  protected void onProcessWindowedValue(WindowedValue<KeyedWorkItem<K, 
InputT>> elem) {
+    // TODO: Get rid of this!
+    final K key = elem.getValue().key();
+    keyValue().write(key);
+
+    boolean hasState = helpers.hasState();
+    Collection<W> windowsProcessed;
+    if (sideInputProcessor != null) {
+      windowsProcessed = hasState ? Lists.newArrayList() : 
Collections.emptyList();
+      WindowedValue<KeyedWorkItem<K, InputT>> unblocked =
+          sideInputProcessor.handleProcessKeyedWorkItem(elem);
+      if (!Iterables.isEmpty(unblocked.getValue().elementsIterable())
+          || !Iterables.isEmpty(unblocked.getValue().timersIterable())) {
+        helpers.fnRunner.processElement(unblocked);
+      }
+      if (hasState) {
+        windowsProcessed.addAll((Collection<W>) unblocked.getWindows());
+      }
+    } else {
+      helpers.fnRunner.processElement(elem);
+      windowsProcessed = (Collection<W>) elem.getWindows();
+    }
+    if (hasState) {
+      helpers.registerStateCleanup(
+          (WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(), 
windowsProcessed);
+    }
+  }
+
+  /**
+   * Returns the {@link DoFnInfo} currently being used by this {@link 
SimpleParDoFn}.
+   *
+   * <p>May be null if no element has been processed yet, or if the {@link 
SimpleParDoFn} has
+   * finished.
+   */
+  @VisibleForTesting
+  @Nullable
+  DoFnInfo<?, ?> getDoFnInfo() {
+    return helpers.fnInfo;
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
index 3b7891c5378..a64d1a970d3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
@@ -17,9 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.worker;
 
-import java.util.Set;
+import java.util.Iterator;
 import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.state.BagState;
 import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -37,43 +36,33 @@ import org.joda.time.Instant;
 public class StreamingSideInputDoFnRunner<InputT, OutputT, W extends 
BoundedWindow>
     implements DoFnRunner<InputT, OutputT> {
   private final DoFnRunner<InputT, OutputT> simpleDoFnRunner;
-  private final StreamingSideInputFetcher<InputT, W> sideInputFetcher;
+  private final StreamingSideInputProcessor<InputT, W> sideInputProcessor;
 
   public StreamingSideInputDoFnRunner(
       DoFnRunner<InputT, OutputT> simpleDoFnRunner,
       StreamingSideInputFetcher<InputT, W> sideInputFetcher) {
     this.simpleDoFnRunner = simpleDoFnRunner;
-    this.sideInputFetcher = sideInputFetcher;
+    this.sideInputProcessor = new 
StreamingSideInputProcessor<>(sideInputFetcher);
   }
 
   @Override
   public void startBundle() {
     simpleDoFnRunner.startBundle();
-    sideInputFetcher.prefetchBlockedMap();
-
-    // Find the set of ready windows.
-    Set<W> readyWindows = sideInputFetcher.getReadyWindows();
-
-    Iterable<BagState<WindowedValue<InputT>>> elementsBags =
-        sideInputFetcher.prefetchElements(readyWindows);
-
-    // Run the DoFn code now that all side inputs are ready.
-    for (BagState<WindowedValue<InputT>> elementsBag : elementsBags) {
-      Iterable<WindowedValue<InputT>> elements = elementsBag.read();
-      for (WindowedValue<InputT> elem : elements) {
-        simpleDoFnRunner.processElement(elem);
-      }
-      elementsBag.clear();
-    }
-    sideInputFetcher.releaseBlockedWindows(readyWindows);
+    sideInputProcessor.tryUnblockElements(
+        unblocked -> {
+          for (WindowedValue<InputT> elem : unblocked) {
+            simpleDoFnRunner.processElement(elem);
+          }
+        });
   }
 
   @Override
   public void processElement(WindowedValue<InputT> compressedElem) {
-    for (WindowedValue<InputT> elem : compressedElem.explodeWindows()) {
-      if (!sideInputFetcher.storeIfBlocked(elem)) {
-        simpleDoFnRunner.processElement(elem);
-      }
+    for (Iterator<? extends WindowedValue<InputT>> it =
+            sideInputProcessor.handleProcessElement(compressedElem);
+        it.hasNext(); ) {
+      WindowedValue<InputT> elem = it.next();
+      simpleDoFnRunner.processElement(elem);
     }
   }
 
@@ -94,7 +83,7 @@ public class StreamingSideInputDoFnRunner<InputT, OutputT, W 
extends BoundedWind
   @Override
   public void finishBundle() {
     simpleDoFnRunner.finishBundle();
-    sideInputFetcher.persist();
+    sideInputProcessor.handleFinishBundle();
   }
 
   @Override
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
index 76913baa6aa..0369b82be73 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.dataflow.worker;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -83,7 +84,6 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
     this.stepContext = stepContext;
 
     this.mainWindowCoder = windowingStrategy.getWindowFn().windowCoder();
-
     this.sideInputViews = new HashMap<>();
     for (PCollectionView<?> view : views) {
       sideInputViews.put(view.getTagInternal().getId(), view);
@@ -188,11 +188,7 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
     return timers;
   }
 
-  /** Compute the set of side inputs that are not yet ready for the given main 
input window. */
-  public boolean storeIfBlocked(WindowedValue<InputT> elem) {
-    @SuppressWarnings("unchecked")
-    W window = (W) Iterables.getOnlyElement(elem.getWindows());
-
+  private Set<Windmill.GlobalDataRequest> checkIfBlocked(W window) {
     Set<Windmill.GlobalDataRequest> blocked = blockedMap().get(window);
     if (blocked == null) {
       for (PCollectionView<?> view : sideInputViews.values()) {
@@ -205,7 +201,16 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
         }
       }
     }
-    if (blocked != null) {
+    return blocked == null ? Collections.emptySet() : blocked;
+  }
+
+  /** Compute the set of side inputs that are not yet ready for the given main 
input window. */
+  public boolean storeIfBlocked(WindowedValue<InputT> elem) {
+    @SuppressWarnings("unchecked")
+    W window = (W) Iterables.getOnlyElement(elem.getWindows());
+
+    Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
+    if (!blocked.isEmpty()) {
       elementBag(window).add(elem);
       watermarkHold(window).add(elem.getTimestamp());
       stepContext.addBlockingSideInputs(blocked);
@@ -223,17 +228,12 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
     @SuppressWarnings("unchecked")
     WindowNamespace<W> windowNamespace = (WindowNamespace<W>) 
timer.getNamespace();
     W window = windowNamespace.getWindow();
-
-    boolean blocked = false;
-    for (PCollectionView<?> view : sideInputViews.values()) {
-      if (!stepContext.issueSideInputFetch(view, window, 
SideInputState.UNKNOWN)) {
-        blocked = true;
-      }
-    }
-    if (blocked) {
+    Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
+    if (!blocked.isEmpty()) {
       timerBag(window).add(timer);
+      return true;
     }
-    return blocked;
+    return false;
   }
 
   public void persist() {
@@ -332,7 +332,7 @@ public class StreamingSideInputFetcher<InputT, W extends 
BoundedWindow> {
         .build();
   }
 
-  private static class GlobalDataRequestCoder extends 
AtomicCoder<GlobalDataRequest> {
+  static class GlobalDataRequestCoder extends AtomicCoder<GlobalDataRequest> {
     private final Class<Windmill.GlobalDataRequest> protoMessageClass =
         Windmill.GlobalDataRequest.class;
     private transient Parser<Windmill.GlobalDataRequest> memoizedParser;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java
new file mode 100644
index 00000000000..34c1a06d54d
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+
+/** Helper class for handling elements blocked on side inputs. */
+@SuppressWarnings("nullness" // 
TODO(https://github.com/apache/beam/issues/20497)
+)
+class StreamingSideInputProcessor<InputT, W extends BoundedWindow> {
+  private final StreamingSideInputFetcher<InputT, W> sideInputFetcher;
+
+  public StreamingSideInputProcessor(StreamingSideInputFetcher<InputT, W> 
sideInputFetcher) {
+    this.sideInputFetcher = sideInputFetcher;
+  }
+
+  /**
+   * Handle's startBundle. If there are unblocked elements, process them and 
then return the set of
+   * windows that were unblocked.
+   */
+  void tryUnblockElements(Consumer<Iterable<WindowedValue<InputT>>> consumer) {
+    sideInputFetcher.prefetchBlockedMap();
+
+    // Find the set of ready windows.
+    Set<W> readyWindows = sideInputFetcher.getReadyWindows();
+
+    Iterable<BagState<WindowedValue<InputT>>> elementBags =
+        sideInputFetcher.prefetchElements(readyWindows);
+    Iterable<WindowedValue<InputT>> releasedElements =
+        Iterables.concat(Iterables.transform(elementBags, BagState::read));
+    consumer.accept(releasedElements);
+    elementBags.forEach(BagState::clear);
+    sideInputFetcher.releaseBlockedWindows(readyWindows);
+  }
+
+  void tryUnblockElementsAndTimers(
+      BiConsumer<Iterable<WindowedValue<InputT>>, 
Iterable<TimerInternals.TimerData>> consumer) {
+    sideInputFetcher.prefetchBlockedMap();
+
+    // Find the set of ready windows.
+    Set<W> readyWindows = sideInputFetcher.getReadyWindows();
+
+    Iterable<BagState<TimerInternals.TimerData>> timerBags =
+        sideInputFetcher.prefetchTimers(readyWindows);
+    Iterable<TimerInternals.TimerData> releasedTimers =
+        Iterables.concat(
+            Iterables.transform(sideInputFetcher.prefetchTimers(readyWindows), 
BagState::read));
+    Iterable<BagState<WindowedValue<InputT>>> elementBags =
+        sideInputFetcher.prefetchElements(readyWindows);
+    Iterable<WindowedValue<InputT>> releasedElements =
+        Iterables.concat(Iterables.transform(elementBags, BagState::read));
+
+    consumer.accept(releasedElements, releasedTimers);
+    timerBags.forEach(BagState::clear);
+    elementBags.forEach(BagState::clear);
+    sideInputFetcher.releaseBlockedWindows(readyWindows);
+  }
+
+  void handleFinishBundle() {
+    sideInputFetcher.persist();
+  }
+
+  /*
+  Handle process element. Runs the elements that have an available side input, 
and buffers elements for which the
+  side input is blocked. Returns the list of elements that are unblocked and 
should be processed.
+   */
+  Iterator<? extends WindowedValue<InputT>> handleProcessElement(
+      WindowedValue<InputT> compressedElem) {
+    // Note: We could write this as a three-line stream expression, but side 
effects are discouraged
+    // in Java streams.
+    return Iterators.filter(
+        compressedElem.explodeWindows().iterator(),
+        (WindowedValue<InputT> e) -> !sideInputFetcher.storeIfBlocked(e));
+  }
+
+  <K> WindowedValue<KeyedWorkItem<K, InputT>> handleProcessKeyedWorkItem(
+      WindowedValue<KeyedWorkItem<K, InputT>> elem) {
+    List<WindowedValue<InputT>> readyInputs =
+        Lists.newArrayList(
+            Iterables.filter(
+                elem.getValue().elementsIterable(),
+                input -> !sideInputFetcher.storeIfBlocked(input)));
+
+    List<TimerInternals.TimerData> readyTimers =
+        Lists.newArrayList(
+            Iterables.filter(
+                elem.getValue().timersIterable(),
+                timer -> !sideInputFetcher.storeIfBlocked(timer)));
+    KeyedWorkItem<K, InputT> keyedWorkItem =
+        KeyedWorkItems.workItem(elem.getValue().key(), readyTimers, 
readyInputs);
+
+    return elem.withValue(keyedWorkItem);
+  }
+
+  void handleProcessTimer(TimerInternals.TimerData timer) {
+    // We must call this to ensure the side-input is cached for the timer. 
However since a user
+    // timer can only
+    // be set via element processing (or another timer) in the same window, 
the window should be
+    // unblocked once
+    // we get here.
+    Preconditions.checkState(!sideInputFetcher.storeIfBlocked(timer));
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
index a8d5975e45e..9466ad60d41 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactory.java
@@ -24,18 +24,22 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import com.google.api.services.dataflow.model.SideInputInfo;
 import java.util.List;
 import java.util.Map;
+import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.dataflow.BatchStatefulParDoOverrides;
 import org.apache.beam.runners.dataflow.DataflowRunner;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.util.PropertyNames;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.DoFnInfo;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
@@ -52,7 +56,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 })
 class UserParDoFnFactory implements ParDoFnFactory {
   static UserParDoFnFactory createDefault() {
-    return new UserParDoFnFactory(new UserDoFnExtractor(), 
SimpleDoFnRunnerFactory.INSTANCE);
+    return new UserParDoFnFactory(new UserDoFnExtractor(), 
SimpleDoFnRunnerFactory.INSTANCE, false);
   }
 
   interface DoFnExtractor {
@@ -74,10 +78,15 @@ class UserParDoFnFactory implements ParDoFnFactory {
 
   private final DoFnExtractor doFnExtractor;
   private final DoFnRunnerFactory runnerFactory;
+  private final boolean streamingKeyedWorkItem;
 
-  UserParDoFnFactory(DoFnExtractor doFnExtractor, DoFnRunnerFactory 
runnerFactory) {
+  UserParDoFnFactory(
+      DoFnExtractor doFnExtractor,
+      DoFnRunnerFactory runnerFactory,
+      boolean streamingKeyedWorkItem) {
     this.doFnExtractor = doFnExtractor;
     this.runnerFactory = runnerFactory;
+    this.streamingKeyedWorkItem = streamingKeyedWorkItem;
   }
 
   @Override
@@ -144,17 +153,38 @@ class UserParDoFnFactory implements ParDoFnFactory {
           writerFn.getDataCoder(),
           (Coder<BoundedWindow>) 
doFnInfo.getWindowingStrategy().getWindowFn().windowCoder());
     } else {
-      return new SimpleParDoFn(
-          options,
-          instanceManager,
-          sideInputReader,
-          doFnInfo.getMainOutput(),
-          outputTupleTagsToReceiverIndices,
-          stepContext,
-          operationContext,
-          doFnInfo.getDoFnSchemaInformation(),
-          doFnInfo.getSideInputMapping(),
-          runnerFactory);
+      boolean hasStreamingSideInput =
+          options.as(StreamingOptions.class).isStreaming() && 
!sideInputReader.isEmpty();
+
+      if (streamingKeyedWorkItem && hasStreamingSideInput) {
+        KeyedWorkItemCoder<byte[], KV<?, ?>> kwiCoder =
+            (KeyedWorkItemCoder<byte[], KV<?, ?>>) doFnInfo.getInputCoder();
+        return new StreamingKeyedWorkItemSideInputParDoFn<>(
+            options,
+            instanceManager,
+            sideInputReader,
+            doFnInfo.getMainOutput(),
+            outputTupleTagsToReceiverIndices,
+            stepContext,
+            operationContext,
+            doFnInfo.getDoFnSchemaInformation(),
+            doFnInfo.getSideInputMapping(),
+            runnerFactory,
+            ByteArrayCoder.of(),
+            kwiCoder.getElementCoder());
+      } else {
+        return new SimpleParDoFn(
+            options,
+            instanceManager,
+            sideInputReader,
+            doFnInfo.getMainOutput(),
+            outputTupleTagsToReceiverIndices,
+            stepContext,
+            operationContext,
+            doFnInfo.getDoFnSchemaInformation(),
+            doFnInfo.getSideInputMapping(),
+            runnerFactory);
+      }
     }
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpersTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpersTest.java
new file mode 100644
index 00000000000..6bbbf953967
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnHelpersTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.SideInputReader;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
+import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.values.TupleTag;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+@RunWith(JUnit4.class)
+public class SimpleParDoFnHelpersTest {
+  private PipelineOptions options;
+  @Mock DoFnInstanceManager doFnInstanceManager;
+  @Mock SideInputReader sideInputReader;
+  @Mock DataflowStepContext stepContext;
+  @Mock DataflowStepContext userStepContext;
+  @Mock DataflowOperationContext operationContext;
+  @Mock DoFnRunnerFactory<String, String> runnerFactory;
+  @Mock DoFnRunner<String, String> mockRunner;
+
+  @Mock StreamingSideInputProcessor<String, GlobalWindow> sideInputProcessor;
+
+  @Mock DoFnInfo<String, String> doFnInfo;
+  @Mock CounterFactory counterFactory;
+
+  private static class TestDoFn extends DoFn<String, String> {
+    @ProcessElement
+    public void processElement() {}
+  }
+
+  private TestDoFn doFn = new TestDoFn();
+
+  private SimpleParDoFnHelpers<String, String, GlobalWindow> helpers;
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setUp() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    options = PipelineOptionsFactory.create();
+    when(stepContext.namespacedToUser()).thenReturn(userStepContext);
+    when(operationContext.counterFactory()).thenReturn(counterFactory);
+
+    when(doFnInstanceManager.get()).thenReturn((DoFnInfo) doFnInfo);
+    when(doFnInfo.getDoFn()).thenReturn(doFn);
+
+    when(runnerFactory.createRunner(
+            any(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any(), any(),
+            any(), any()))
+        .thenReturn(mockRunner);
+
+    helpers =
+        new SimpleParDoFnHelpers<>(
+            options,
+            doFnInstanceManager,
+            sideInputReader,
+            new TupleTag<>("main"),
+            ImmutableMap.of(new TupleTag<>("main"), 0),
+            stepContext,
+            operationContext,
+            DoFnSchemaInformation.create(),
+            ImmutableMap.of(),
+            runnerFactory);
+  }
+
+  @Test
+  public void testReallyStartBundle() throws Exception {
+    helpers.startBundle(mock(Receiver.class));
+    helpers.reallyStartBundle();
+
+    verify(runnerFactory)
+        .createRunner(
+            any(), any(), any(), any(), any(), any(), any(), any(), any(), 
any(), any(), any(),
+            any(), any());
+    verify(mockRunner).startBundle();
+  }
+
+  @Test
+  public void testFinishBundle() throws Exception {
+    helpers.startBundle(mock(Receiver.class));
+    helpers.reallyStartBundle();
+
+    helpers.finishBundle(sideInputProcessor);
+
+    verify(mockRunner).finishBundle();
+    verify(sideInputProcessor).handleFinishBundle();
+    verify(doFnInstanceManager).complete(any());
+  }
+
+  @Test
+  public void testAbort() throws Exception {
+    helpers.startBundle(mock(Receiver.class));
+    helpers.reallyStartBundle();
+
+    helpers.abort();
+
+    verify(doFnInstanceManager).abort(any());
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
index 9c9f5386f44..5b1d7a8d366 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
@@ -92,7 +92,7 @@ public class SimpleParDoFnTest {
     // TODO: Remove once Distributions has shipped.
     options
         .as(DataflowPipelineDebugOptions.class)
-        
.setExperiments(Lists.newArrayList(SimpleParDoFn.OUTPUTS_PER_ELEMENT_EXPERIMENT));
+        
.setExperiments(Lists.newArrayList(SimpleParDoFnHelpers.OUTPUTS_PER_ELEMENT_EXPERIMENT));
 
     operationContext = TestOperationContext.create();
     stepContext =
@@ -489,7 +489,7 @@ public class SimpleParDoFnTest {
       }
 
       @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
+      public void processElement() throws Exception {
         assertThat(startCalled, equalTo(true));
         assertThat(tracker.getCurrentState(), 
equalTo(operationContext.getProcessState()));
       }
@@ -558,7 +558,7 @@ public class SimpleParDoFnTest {
   public void testOutputsPerElementCounterDisabledViaExperiment() throws 
Exception {
     DataflowPipelineDebugOptions debugOptions = 
options.as(DataflowPipelineDebugOptions.class);
     List<String> experiments = debugOptions.getExperiments();
-    experiments.remove(SimpleParDoFn.OUTPUTS_PER_ELEMENT_EXPERIMENT);
+    experiments.remove(SimpleParDoFnHelpers.OUTPUTS_PER_ELEMENT_EXPERIMENT);
     debugOptions.setExperiments(experiments);
 
     List<CounterUpdate> counterUpdates = executeParDoFnCounterTest(0);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
index e12ddd95f91..654707aae91 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunnerTest.java
@@ -202,8 +202,7 @@ public class StreamingKeyedWorkItemSideInputDoFnRunnerTest {
             (WindowedValue<KV<String, Integer>> windowedValue) ->
                 outputManager.output(mainOutputTag, windowedValue),
             stepContext);
-    return new StreamingKeyedWorkItemSideInputDoFnRunner<
-        String, Integer, KV<String, Integer>, IntervalWindow>(
+    return new StreamingKeyedWorkItemSideInputDoFnRunner<>(
         simpleDoFnRunner, keyCoder, sideInputFetcher, stepContext);
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFnTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFnTest.java
new file mode 100644
index 00000000000..2fed6fd405a
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputParDoFnTest.java
@@ -0,0 +1,488 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.InMemoryStateInternals;
+import org.apache.beam.runners.core.KeyedWorkItem;
+import org.apache.beam.runners.core.KeyedWorkItems;
+import 
org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker;
+import org.apache.beam.runners.core.SideInputReader;
+import org.apache.beam.runners.core.SimpleDoFnRunner;
+import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems.ProcessFn;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import 
org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputState;
+import org.apache.beam.runners.dataflow.worker.util.ValueInEmptyWindows;
+import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
+import org.apache.beam.runners.dataflow.worker.windmill.Windmill.Timer;
+import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.DoFnInfo;
+import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link StreamingKeyedWorkItemSideInputParDoFn}. */
+@RunWith(JUnit4.class)
+public class StreamingKeyedWorkItemSideInputParDoFnTest {
+  private static final FixedWindows WINDOW_FN = 
FixedWindows.of(Duration.millis(10));
+  private static final TupleTag<KV<String, Integer>> MAIN_OUTPUT_TAG = new 
TupleTag<>();
+
+  private final InMemoryStateInternals<String> state = 
InMemoryStateInternals.forKey("a");
+
+  @Mock private StreamingModeExecutionContext.StepContext stepContext;
+  @Mock private TimerInternals mockTimerInternals;
+  @Mock private SideInputReader mockSideInputReader;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(stepContext.stateInternals()).thenReturn((StateInternals) state);
+    when(stepContext.timerInternals()).thenReturn(mockTimerInternals);
+    when(stepContext.namespacedToUser()).thenReturn(stepContext);
+    when(mockSideInputReader.isEmpty()).thenReturn(false);
+  }
+
+  @Test
+  public void testInvokeProcessElement() throws Exception {
+    PCollectionView<String> view = createView();
+
+    when(stepContext.issueSideInputFetch(
+            eq(view), any(BoundedWindow.class), eq(SideInputState.UNKNOWN)))
+        .thenReturn(false);
+    when(stepContext.issueSideInputFetch(
+            eq(view), any(BoundedWindow.class), 
eq(SideInputState.KNOWN_READY)))
+        .thenReturn(true);
+
+    
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(Instant.ofEpochMilli(15L));
+    StreamingKeyedWorkItemSideInputParDoFn<String, Integer, KV<String, 
Integer>, IntervalWindow>
+        runner = createRunner(view);
+
+    TestReceiver receiver = new TestReceiver();
+    runner.startBundle(receiver);
+
+    KeyedWorkItem<String, Integer> elemsWorkItem =
+        KeyedWorkItems.elementsWorkItem(
+            "a",
+            ImmutableList.of(
+                createDatum(13, 13L),
+                createDatum(16, 16L), // side inputs non-ready element
+                createDatum(18, 18L)));
+
+    runner.processElement(new ValueInEmptyWindows<>(elemsWorkItem));
+
+    // Initially blocked! No output.
+    assertEquals(0, receiver.outputs.size());
+
+    
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(Instant.ofEpochMilli(20));
+    runner.processElement(
+        new ValueInEmptyWindows<>(
+            KeyedWorkItems.<String, Integer>timersWorkItem(
+                "a",
+                ImmutableList.of(
+                    timerData(window(10, 20), Instant.ofEpochMilli(19), 
Timer.Type.WATERMARK)))));
+
+    // Timer is blocked too!
+    assertEquals(0, receiver.outputs.size());
+
+    // Now make it ready!
+    IntervalWindow readyWindow = window(10, 20);
+    Windmill.GlobalDataId id =
+        Windmill.GlobalDataId.newBuilder()
+            .setTag(view.getTagInternal().getId())
+            .setVersion(
+                ByteString.copyFrom(
+                    CoderUtils.encodeToByteArray(IntervalWindow.getCoder(), 
readyWindow)))
+            .build();
+
+    when(stepContext.getSideInputNotifications())
+        .thenReturn(Arrays.<Windmill.GlobalDataId>asList(id));
+
+    runner.finishBundle();
+
+    runner.startBundle(receiver);
+
+    // We don't check for output here because we just wanted to see if the 
runner works
+    // without exceptions. The issue was lifecycle of the runner bundle 
(finishBundle, startBundle).
+  }
+
+  static class TestSplittableDoFn extends DoFn<Integer, KV<String, Integer>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, RestrictionTracker<String, ?> 
tracker) {
+      c.output(KV.of(tracker.currentRestriction(), c.element()));
+    }
+
+    @GetInitialRestriction
+    public String getInitialRestriction(@Element Integer element) {
+      return "restriction";
+    }
+
+    @NewTracker
+    public RestrictionTracker<String, ?> newTracker(@Restriction String 
restriction) {
+      return new RestrictionTracker<String, Object>() {
+        @Override
+        public boolean tryClaim(Object position) {
+          return true;
+        }
+
+        @Override
+        public String currentRestriction() {
+          return restriction;
+        }
+
+        @Override
+        public SplitResult<String> trySplit(double fractionOfRemainder) {
+          return null;
+        }
+
+        @Override
+        public void checkDone() {}
+
+        @Override
+        public IsBounded isBounded() {
+          return IsBounded.BOUNDED;
+        }
+      };
+    }
+  }
+
+  @Test
+  public void testSplittableProcessElement() throws Exception {
+    PCollectionView<String> view = createView();
+
+    when(stepContext.issueSideInputFetch(
+            eq(view), any(BoundedWindow.class), eq(SideInputState.UNKNOWN)))
+        .thenReturn(false);
+    when(stepContext.issueSideInputFetch(
+            eq(view), any(BoundedWindow.class), 
eq(SideInputState.KNOWN_READY)))
+        .thenReturn(true);
+
+    
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(Instant.ofEpochMilli(15L));
+    StreamingKeyedWorkItemSideInputParDoFn<
+            byte[], KV<Integer, String>, KV<String, Integer>, IntervalWindow>
+        runner = createSplittableRunner(view);
+
+    TestReceiver receiver = new TestReceiver();
+    runner.startBundle(receiver);
+
+    KeyedWorkItem<byte[], KV<Integer, String>> elemsWorkItem =
+        KeyedWorkItems.elementsWorkItem(
+            new byte[] {1}, ImmutableList.of(createDatum(KV.of(13, 
"restriction"), 13L)));
+
+    runner.processElement(new ValueInEmptyWindows<>(elemsWorkItem));
+
+    // Initially blocked! No output.
+    assertEquals(0, receiver.outputs.size());
+    runner.finishBundle();
+
+    // Now make it ready!
+    IntervalWindow readyWindow = window(10, 20);
+    Windmill.GlobalDataId id =
+        Windmill.GlobalDataId.newBuilder()
+            .setTag(view.getTagInternal().getId())
+            .setVersion(
+                ByteString.copyFrom(
+                    CoderUtils.encodeToByteArray(IntervalWindow.getCoder(), 
readyWindow)))
+            .build();
+
+    when(stepContext.getSideInputNotifications())
+        .thenReturn(Arrays.<Windmill.GlobalDataId>asList(id));
+
+    runner.startBundle(receiver);
+
+    // Note: unblocking logic would run here if the environment is fully 
mocked to push
+    // blocked items back into processing. For the purpose of testing 
SplittableDoFn initialization,
+    // this suffices.
+  }
+
+  private <T> WindowedValue<T> createDatum(T element, long timestampMillis) {
+    Instant timestamp = Instant.ofEpochMilli(timestampMillis);
+    return WindowedValues.of(
+        element, timestamp, Arrays.asList(WINDOW_FN.assignWindow(timestamp)), 
PaneInfo.NO_FIRING);
+  }
+
+  private TimerData timerData(IntervalWindow window, Instant timestamp, 
Timer.Type type) {
+    return TimerData.of(
+        StateNamespaces.window(IntervalWindow.getCoder(), window),
+        timestamp,
+        timestamp,
+        type == Windmill.Timer.Type.WATERMARK ? TimeDomain.EVENT_TIME : 
TimeDomain.PROCESSING_TIME,
+        CausedByDrain.NORMAL);
+  }
+
+  private IntervalWindow window(long start, long end) {
+    return new IntervalWindow(Instant.ofEpochMilli(start), 
Instant.ofEpochMilli(end));
+  }
+
+  private PCollectionView<String> createView() {
+    return TestPipeline.create()
+        .apply(Create.empty(StringUtf8Coder.of()))
+        .apply(Window.<String>into(WINDOW_FN))
+        .apply(View.<String>asSingleton());
+  }
+
+  static class TestReceiver implements Receiver {
+    List<WindowedValue<KV<String, Integer>>> outputs = new ArrayList<>();
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void process(Object outputElem) {
+      outputs.add((WindowedValue<KV<String, Integer>>) outputElem);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private StreamingKeyedWorkItemSideInputParDoFn<
+          String, Integer, KV<String, Integer>, IntervalWindow>
+      createRunner(PCollectionView<String> view) throws Exception {
+    Coder<String> keyCoder = StringUtf8Coder.of();
+    Coder<Integer> inputCoder = BigEndianIntegerCoder.of();
+
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy = 
WindowingStrategy.of(WINDOW_FN);
+
+    DoFn<KeyedWorkItem<String, Integer>, KV<String, Integer>> theDoFn =
+        new DoFn<KeyedWorkItem<String, Integer>, KV<String, Integer>>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) {
+            KeyedWorkItem<String, Integer> kwi = c.element();
+            for (WindowedValue<Integer> wv : kwi.elementsIterable()) {
+              c.output(KV.of(kwi.key(), wv.getValue()));
+            }
+          }
+        };
+
+    DoFnInfo<KeyedWorkItem<String, Integer>, KV<String, Integer>> fnInfo =
+        DoFnInfo.forFn(
+            theDoFn,
+            windowingStrategy,
+            ImmutableList.of(view),
+            (Coder) null,
+            MAIN_OUTPUT_TAG,
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    DoFnRunnerFactory<KeyedWorkItem<String, Integer>, KV<String, Integer>> 
runnerFactory =
+        new DoFnRunnerFactory<KeyedWorkItem<String, Integer>, KV<String, 
Integer>>() {
+          @Override
+          public DoFnRunner<KeyedWorkItem<String, Integer>, KV<String, 
Integer>> createRunner(
+              DoFn<KeyedWorkItem<String, Integer>, KV<String, Integer>> fn,
+              PipelineOptions options,
+              TupleTag<KV<String, Integer>> mainOutputTag,
+              List<TupleTag<?>> sideOutputTags,
+              Iterable<PCollectionView<?>> sideInputViews,
+              SideInputReader sideInputReader,
+              Coder<KeyedWorkItem<String, Integer>> inputCoder,
+              Map<TupleTag<?>, Coder<?>> outputCoders,
+              WindowingStrategy<?, ?> windowingStrategy,
+              DataflowExecutionContext.DataflowStepContext stepContext,
+              DataflowExecutionContext.DataflowStepContext userStepContext,
+              WindowedValueMultiReceiver outputManager2,
+              DoFnSchemaInformation doFnSchemaInformation,
+              Map<String, PCollectionView<?>> sideInputMapping) {
+            return new SimpleDoFnRunner<>(
+                options,
+                fn,
+                sideInputReader,
+                outputManager2,
+                mainOutputTag,
+                sideOutputTags,
+                stepContext,
+                inputCoder,
+                outputCoders,
+                windowingStrategy,
+                doFnSchemaInformation,
+                sideInputMapping);
+          }
+        };
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.as(StreamingOptions.class).setStreaming(true);
+
+    return new StreamingKeyedWorkItemSideInputParDoFn<>(
+        options,
+        DoFnInstanceManagers.singleInstance(fnInfo),
+        mockSideInputReader,
+        MAIN_OUTPUT_TAG,
+        ImmutableMap.of(MAIN_OUTPUT_TAG, 0),
+        stepContext,
+        TestOperationContext.create(),
+        DoFnSchemaInformation.create(),
+        Collections.emptyMap(),
+        runnerFactory,
+        keyCoder,
+        inputCoder);
+  }
+
+  @SuppressWarnings("unchecked")
+  private StreamingKeyedWorkItemSideInputParDoFn<
+          byte[], KV<Integer, String>, KV<String, Integer>, IntervalWindow>
+      createSplittableRunner(PCollectionView<String> view) throws Exception {
+    ByteArrayCoder keyCoder = ByteArrayCoder.of();
+    Coder<KV<Integer, String>> inputCoder =
+        KvCoder.of(BigEndianIntegerCoder.of(), StringUtf8Coder.of());
+
+    WindowingStrategy<Integer, IntervalWindow> windowingStrategy =
+        (WindowingStrategy) WindowingStrategy.of(WINDOW_FN);
+
+    TestSplittableDoFn theDoFn = new TestSplittableDoFn();
+
+    ProcessFn<Integer, KV<String, Integer>, String, Object, Object> processFn =
+        new ProcessFn<Integer, KV<String, Integer>, String, Object, Object>(
+            theDoFn,
+            BigEndianIntegerCoder.of(),
+            StringUtf8Coder.of(),
+            (Coder) StringUtf8Coder.of(), // watermarkEstimatorStateCoder
+            windowingStrategy,
+            Collections.emptyMap());
+    processFn.setup(PipelineOptionsFactory.create());
+
+    DoFnInfo<KeyedWorkItem<byte[], KV<Integer, String>>, KV<String, Integer>> 
fnInfo =
+        DoFnInfo.forFn(
+            processFn,
+            windowingStrategy,
+            ImmutableList.of(view),
+            (Coder) null,
+            MAIN_OUTPUT_TAG,
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+
+    DoFnRunnerFactory<KeyedWorkItem<byte[], KV<Integer, String>>, KV<String, 
Integer>>
+        runnerFactory =
+            new DoFnRunnerFactory<
+                KeyedWorkItem<byte[], KV<Integer, String>>, KV<String, 
Integer>>() {
+              @Override
+              public DoFnRunner<KeyedWorkItem<byte[], KV<Integer, String>>, 
KV<String, Integer>>
+                  createRunner(
+                      DoFn<KeyedWorkItem<byte[], KV<Integer, String>>, 
KV<String, Integer>> fn,
+                      PipelineOptions options,
+                      TupleTag<KV<String, Integer>> mainOutputTag,
+                      List<TupleTag<?>> sideOutputTags,
+                      Iterable<PCollectionView<?>> sideInputViews,
+                      SideInputReader sideInputReader,
+                      Coder<KeyedWorkItem<byte[], KV<Integer, String>>> 
inputCoder,
+                      Map<TupleTag<?>, Coder<?>> outputCoders,
+                      WindowingStrategy<?, ?> windowingStrategy,
+                      DataflowExecutionContext.DataflowStepContext stepContext,
+                      DataflowExecutionContext.DataflowStepContext 
userStepContext,
+                      WindowedValueMultiReceiver outputManager2,
+                      DoFnSchemaInformation doFnSchemaInformation,
+                      Map<String, PCollectionView<?>> sideInputMapping) {
+
+                ProcessFn<Integer, KV<String, Integer>, String, Object, 
Object> fn2 =
+                    (ProcessFn<Integer, KV<String, Integer>, String, Object, 
Object>) fn;
+                fn2.setStateInternalsFactory(key -> (StateInternals) 
stepContext.stateInternals());
+                fn2.setTimerInternalsFactory(key -> 
stepContext.timerInternals());
+                fn2.setSideInputReader(sideInputReader);
+                fn2.setProcessElementInvoker(
+                    new OutputAndTimeBoundedSplittableProcessElementInvoker<
+                        Integer, KV<String, Integer>, String, Object, Object>(
+                        fn2.getFn(),
+                        options,
+                        outputManager2,
+                        mainOutputTag,
+                        sideInputReader,
+                        Executors.newSingleThreadScheduledExecutor(),
+                        10000,
+                        Duration.standardSeconds(10),
+                        () -> null));
+
+                return new SimpleDoFnRunner<>(
+                    options,
+                    fn,
+                    sideInputReader,
+                    outputManager2,
+                    mainOutputTag,
+                    sideOutputTags,
+                    stepContext,
+                    inputCoder,
+                    outputCoders,
+                    windowingStrategy,
+                    doFnSchemaInformation,
+                    sideInputMapping);
+              }
+            };
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options.as(StreamingOptions.class).setStreaming(true);
+
+    return new StreamingKeyedWorkItemSideInputParDoFn<>(
+        options,
+        DoFnInstanceManagers.singleInstance(fnInfo),
+        mockSideInputReader,
+        MAIN_OUTPUT_TAG,
+        ImmutableMap.of(MAIN_OUTPUT_TAG, 0),
+        stepContext,
+        TestOperationContext.create(),
+        DoFnSchemaInformation.create(),
+        Collections.emptyMap(),
+        runnerFactory,
+        keyCoder,
+        inputCoder);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessorTest.java
new file mode 100644
index 00000000000..19e22b03883
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessorTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.emptyIterable;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.client.util.Lists;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.CausedByDrain;
+import org.apache.beam.sdk.values.WindowedValue;
+import org.apache.beam.sdk.values.WindowedValues;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Unit tests for {@link StreamingSideInputProcessor}. */
+@RunWith(JUnit4.class)
+public class StreamingSideInputProcessorTest {
+
+  @Mock private StreamingSideInputFetcher<String, IntervalWindow> mockFetcher;
+  private StreamingSideInputProcessor<String, IntervalWindow> processor;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    processor = new StreamingSideInputProcessor<>(mockFetcher);
+  }
+
+  @Test
+  public void testTryUnblockElementsNoReadyWindows() {
+    // Given
+    doNothing().when(mockFetcher).prefetchBlockedMap();
+    when(mockFetcher.getReadyWindows()).thenReturn(Collections.emptySet());
+
+    // When
+    processor.tryUnblockElements(unblocked -> assertThat(unblocked, 
emptyIterable()));
+
+    // Then
+    verify(mockFetcher).prefetchBlockedMap();
+    verify(mockFetcher).getReadyWindows();
+  }
+
+  @Test
+  public void testTryUnblockElementsWithReadyWindows() {
+    // Given
+    IntervalWindow window1 = new IntervalWindow(Instant.ofEpochMilli(0), 
Instant.ofEpochMilli(10));
+    IntervalWindow window2 = new IntervalWindow(Instant.ofEpochMilli(10), 
Instant.ofEpochMilli(20));
+    Set<IntervalWindow> readyWindows = new HashSet<>(Arrays.asList(window1, 
window2));
+
+    WindowedValue<String> element1 =
+        WindowedValues.of(
+            "e1", Instant.ofEpochMilli(5), Arrays.asList(window1), 
PaneInfo.NO_FIRING);
+    WindowedValue<String> element2 =
+        WindowedValues.of(
+            "e2", Instant.ofEpochMilli(15), Arrays.asList(window2), 
PaneInfo.NO_FIRING);
+
+    @SuppressWarnings("unchecked")
+    BagState<WindowedValue<String>> mockBag1 = mock(BagState.class);
+    @SuppressWarnings("unchecked")
+    BagState<WindowedValue<String>> mockBag2 = mock(BagState.class);
+
+    when(mockBag1.read()).thenReturn(Arrays.asList(element1));
+    when(mockBag2.read()).thenReturn(Arrays.asList(element2));
+
+    doNothing().when(mockFetcher).prefetchBlockedMap();
+    when(mockFetcher.getReadyWindows()).thenReturn(readyWindows);
+    
when(mockFetcher.prefetchElements(readyWindows)).thenReturn(Arrays.asList(mockBag1,
 mockBag2));
+    doNothing().when(mockFetcher).releaseBlockedWindows(readyWindows);
+
+    // When
+    processor.tryUnblockElements(
+        unblocked -> assertThat(unblocked, containsInAnyOrder(element1, 
element2)));
+
+    // Then
+    verify(mockFetcher).prefetchBlockedMap();
+    verify(mockFetcher).getReadyWindows();
+    verify(mockFetcher).prefetchElements(readyWindows);
+    verify(mockBag1).read();
+    verify(mockBag1).clear();
+    verify(mockBag2).read();
+    verify(mockBag2).clear();
+    verify(mockFetcher).releaseBlockedWindows(readyWindows);
+  }
+
+  @Test
+  public void testHandleFinishBundle() {
+    // Given
+    doNothing().when(mockFetcher).persist();
+
+    // When
+    processor.handleFinishBundle();
+
+    // Then
+    verify(mockFetcher).persist();
+  }
+
+  @Test
+  public void testHandleProcessElementBlocked() {
+    // Given
+    IntervalWindow window = new IntervalWindow(Instant.ofEpochMilli(0), 
Instant.ofEpochMilli(10));
+    WindowedValue<String> compressedElement =
+        WindowedValues.of("e", Instant.ofEpochMilli(5), Arrays.asList(window), 
PaneInfo.NO_FIRING);
+
+    
when(mockFetcher.storeIfBlocked(any(WindowedValue.class))).thenReturn(true);
+
+    // When
+    Iterator<? extends WindowedValue<String>> unblocked =
+        processor.handleProcessElement(compressedElement);
+
+    // Then
+    assertFalse(unblocked.hasNext());
+    for (WindowedValue<String> exploded : compressedElement.explodeWindows()) {
+      verify(mockFetcher).storeIfBlocked(exploded);
+    }
+  }
+
+  @Test
+  public void testHandleProcessElementUnblocked() {
+    // Given
+    IntervalWindow window1 = new IntervalWindow(Instant.ofEpochMilli(0), 
Instant.ofEpochMilli(10));
+    IntervalWindow window2 = new IntervalWindow(Instant.ofEpochMilli(10), 
Instant.ofEpochMilli(20));
+    WindowedValue<String> compressedElement =
+        WindowedValues.of(
+            "e", Instant.ofEpochMilli(5), Arrays.asList(window1, window2), 
PaneInfo.NO_FIRING);
+
+    
when(mockFetcher.storeIfBlocked(any(WindowedValue.class))).thenReturn(false);
+
+    // When
+    Iterator<? extends WindowedValue<String>> unblocked =
+        processor.handleProcessElement(compressedElement);
+    // Then
+    assertThat(
+        Lists.newArrayList(unblocked),
+        containsInAnyOrder(
+            Iterables.toArray(compressedElement.explodeWindows(), 
WindowedValue.class)));
+    for (WindowedValue<String> exploded : compressedElement.explodeWindows()) {
+      verify(mockFetcher).storeIfBlocked(exploded);
+    }
+  }
+
+  @Test
+  public void testHandleProcessTimerSuccess() {
+    // Given
+    TimerData testTimer =
+        TimerData.of(
+            StateNamespaces.global(),
+            Instant.ofEpochMilli(1000),
+            Instant.ofEpochMilli(2000),
+            TimeDomain.EVENT_TIME,
+            CausedByDrain.NORMAL);
+    when(mockFetcher.storeIfBlocked(testTimer)).thenReturn(false);
+
+    // When
+    processor.handleProcessTimer(testTimer);
+
+    // Then
+    verify(mockFetcher).storeIfBlocked(testTimer);
+  }
+
+  @Test
+  public void testHandleProcessTimerThrowsPreconditionFail() {
+    // Given
+    TimerData testTimer =
+        TimerData.of(
+            StateNamespaces.global(),
+            Instant.ofEpochMilli(1000),
+            Instant.ofEpochMilli(2000),
+            TimeDomain.EVENT_TIME,
+            CausedByDrain.NORMAL);
+    when(mockFetcher.storeIfBlocked(testTimer)).thenReturn(true);
+
+    // When & Then
+    assertThrows(IllegalStateException.class, () -> 
processor.handleProcessTimer(testTimer));
+    verify(mockFetcher).storeIfBlocked(testTimer);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
index 9d3fa9b211b..43331d11a7e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/UserParDoFnFactoryTest.java
@@ -323,10 +323,6 @@ public class UserParDoFnFactoryTest {
 
   private CloudObject getCloudObject(DoFn<?, ?> fn, WindowingStrategy<?, ?> 
windowingStrategy) {
     CloudObject object = CloudObject.forClassName("DoFn");
-    @SuppressWarnings({
-      "rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
-      "unchecked"
-    })
     DoFnInfo<?, ?> info =
         DoFnInfo.forFn(
             fn,
@@ -377,13 +373,14 @@ public class UserParDoFnFactoryTest {
     Receiver rcvr = new OutputReceiver();
     parDoFn.startBundle(rcvr);
 
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(10));
+    IntervalWindow firstWindow =
+        new IntervalWindow(Instant.ofEpochMilli(0), Instant.ofEpochMilli(10));
     parDoFn.processElement(
-        WindowedValues.of("foo", new Instant(1), firstWindow, 
PaneInfo.NO_FIRING));
+        WindowedValues.of("foo", Instant.ofEpochMilli(1), firstWindow, 
PaneInfo.NO_FIRING));
 
     verify(stepContext)
         .setStateCleanupTimer(
-            SimpleParDoFn.CLEANUP_TIMER_ID,
+            SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
             firstWindow,
             IntervalWindow.getCoder(),
             firstWindow.maxTimestamp().plus(Duration.millis(1L)),
@@ -436,14 +433,14 @@ public class UserParDoFnFactoryTest {
 
     GlobalWindow globalWindow = GlobalWindow.INSTANCE;
     parDoFn.processElement(
-        WindowedValues.of("foo", new Instant(1), globalWindow, 
PaneInfo.NO_FIRING));
+        WindowedValues.of("foo", Instant.ofEpochMilli(1), globalWindow, 
PaneInfo.NO_FIRING));
 
     assertThat(
         globalWindow.maxTimestamp().plus(allowedLateness),
         greaterThan(BoundedWindow.TIMESTAMP_MAX_VALUE));
     verify(stepContext)
         .setStateCleanupTimer(
-            SimpleParDoFn.CLEANUP_TIMER_ID,
+            SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
             globalWindow,
             GlobalWindow.Coder.INSTANCE,
             BoundedWindow.TIMESTAMP_MAX_VALUE,
@@ -459,7 +456,7 @@ public class UserParDoFnFactoryTest {
     when(stepContext.getNextFiredTimer((Coder) GlobalWindow.Coder.INSTANCE))
         .thenReturn(
             TimerData.of(
-                SimpleParDoFn.CLEANUP_TIMER_ID,
+                SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
                 globalWindowNamespace,
                 BoundedWindow.TIMESTAMP_MAX_VALUE,
                 BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1)),
@@ -516,8 +513,10 @@ public class UserParDoFnFactoryTest {
     Receiver rcvr = new OutputReceiver();
     parDoFn.startBundle(rcvr);
 
-    IntervalWindow firstWindow = new IntervalWindow(new Instant(0), new 
Instant(9));
-    IntervalWindow secondWindow = new IntervalWindow(new Instant(10), new 
Instant(19));
+    IntervalWindow firstWindow =
+        new IntervalWindow(Instant.ofEpochMilli(0), Instant.ofEpochMilli(9));
+    IntervalWindow secondWindow =
+        new IntervalWindow(Instant.ofEpochMilli(10), Instant.ofEpochMilli(19));
 
     Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
     StateNamespace firstWindowNamespace = StateNamespaces.window(windowCoder, 
firstWindow);
@@ -535,7 +534,7 @@ public class UserParDoFnFactoryTest {
     when(stepContext.getNextFiredTimer(windowCoder))
         .thenReturn(
             TimerData.of(
-                SimpleParDoFn.CLEANUP_TIMER_ID,
+                SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
                 firstWindowNamespace,
                 firstWindow.maxTimestamp().plus(Duration.millis(1L)),
                 firstWindow.maxTimestamp().plus(Duration.millis(1L)),
@@ -552,7 +551,7 @@ public class UserParDoFnFactoryTest {
     when(stepContext.getNextFiredTimer((Coder) windowCoder))
         .thenReturn(
             TimerData.of(
-                SimpleParDoFn.CLEANUP_TIMER_ID,
+                SimpleParDoFnHelpers.CLEANUP_TIMER_ID,
                 secondWindowNamespace,
                 secondWindow.maxTimestamp().plus(Duration.millis(1L)),
                 secondWindow.maxTimestamp().plus(Duration.millis(1L)),
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index 9357515f36c..03049f29331 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -162,6 +162,9 @@ def sickbayTests = [
     // java.lang.IllegalStateException: java.io.EOFException
     'org.apache.beam.sdk.transforms.ViewTest.testSideInputWithNestedIterables',
 
+    // Triggers index-out-of-bound error in Prism
+    'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testTimerSideInput',
+
     // Missing output due to processing time timer skew.
     
'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testProcessElementSkew',
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputsInTimer.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputsInTimer.java
new file mode 100644
index 00000000000..8320c1451d1
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSideInputsInTimer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.testing;
+
+import org.apache.beam.sdk.annotations.Internal;
+
+/**
+ * Category tag for validation tests which use sideinputs in OnTimer and 
OnWindowExpiration. Tests
+ * tagged with {@link UsesSideInputsInTimer} should be run for runners which 
support sideinputs.
+ */
+@Internal
+public class UsesSideInputsInTimer {}
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index a366ded4fe2..bfd04908b99 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -359,6 +359,14 @@ public abstract class DoFn<InputT extends @Nullable 
Object, OutputT extends @Nul
     /** Returns the time domain of the current timer. */
     public abstract TimeDomain timeDomain();
 
+    /**
+     * Returns the value of the side input.
+     *
+     * @throws IllegalArgumentException if this is not a side input
+     */
+    @Pure
+    public abstract <T> T sideInput(PCollectionView<T> view);
+
     @Pure
     public abstract org.apache.beam.sdk.values.CausedByDrain causedByDrain();
   }
@@ -368,6 +376,14 @@ public abstract class DoFn<InputT extends @Nullable 
Object, OutputT extends @Nul
     /** Returns the window in which the window expiration is firing. */
     @Pure
     public abstract BoundedWindow window();
+
+    /**
+     * Returns the value of the side input.
+     *
+     * @throws IllegalArgumentException if this is not a side input
+     */
+    @Pure
+    public abstract <T> T sideInput(PCollectionView<T> view);
   }
 
   /**
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index 0bd2c1c888f..2983fc94021 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -197,7 +197,8 @@ public class DoFnSignatures {
           Parameter.TimerIdParameter.class,
           Parameter.FireTimestampParameter.class,
           Parameter.CausedByDrainParameter.class,
-          Parameter.KeyParameter.class);
+          Parameter.KeyParameter.class,
+          Parameter.SideInputParameter.class);
 
   private static final ImmutableList<Class<? extends Parameter>>
       ALLOWED_ON_TIMER_FAMILY_PARAMETERS =
@@ -215,7 +216,8 @@ public class DoFnSignatures {
               Parameter.TimerIdParameter.class,
               Parameter.FireTimestampParameter.class,
               Parameter.CausedByDrainParameter.class,
-              Parameter.KeyParameter.class);
+              Parameter.KeyParameter.class,
+              Parameter.SideInputParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_ON_WINDOW_EXPIRATION_PARAMETERS =
@@ -226,7 +228,8 @@ public class DoFnSignatures {
               Parameter.TaggedOutputReceiverParameter.class,
               Parameter.StateParameter.class,
               Parameter.TimestampParameter.class,
-              Parameter.KeyParameter.class);
+              Parameter.KeyParameter.class,
+              Parameter.SideInputParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_GET_INITIAL_RESTRICTION_PARAMETERS =
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index 8a273127b4f..0c984d01c8f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -114,6 +114,7 @@ import org.apache.beam.sdk.testing.UsesProcessingTimeTimers;
 import org.apache.beam.sdk.testing.UsesRequiresTimeSortedInput;
 import org.apache.beam.sdk.testing.UsesSetState;
 import org.apache.beam.sdk.testing.UsesSideInputs;
+import org.apache.beam.sdk.testing.UsesSideInputsInTimer;
 import org.apache.beam.sdk.testing.UsesSideInputsWithDifferentCoders;
 import org.apache.beam.sdk.testing.UsesStatefulParDo;
 import org.apache.beam.sdk.testing.UsesStrictTimerOrdering;
@@ -3678,6 +3679,154 @@ public class ParDoTest implements Serializable {
       pipeline.run();
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesSideInputs.class,
+      UsesSideInputsInTimer.class,
+      UsesTestStream.class,
+      UsesTimersInParDo.class,
+      UsesTriggeredSideInputs.class,
+      UsesOnWindowExpiration.class
+    })
+    public void testTimerSideInput() {
+      // SideInput tag id
+      final String sideInputTag1 = "tag1";
+
+      final PCollectionView<Integer> sideInput =
+          pipeline
+              .apply("CreateSideInput1", Create.of(2))
+              .apply("ViewSideInput1", View.asSingleton());
+
+      DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn =
+          new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
+            @TimerId("timer")
+            private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @StateId("foo")
+            private final StateSpec<ValueState<Integer>> stateSpec = 
StateSpecs.value();
+
+            @ProcessElement
+            public void process(@Timestamp Instant ts, @TimerId("timer") Timer 
timer) {
+              timer.align(Duration.standardSeconds(10)).setRelative();
+            }
+
+            @OnTimer("timer")
+            public void onTimer(
+                OutputReceiver<KV<Integer, Integer>> o,
+                @DoFn.SideInput(sideInputTag1) Integer sideInput,
+                @Key Integer key) {
+              o.output(KV.of(key, sideInput));
+            }
+
+            @OnWindowExpiration
+            public void onWindowExpiration(
+                @DoFn.SideInput(sideInputTag1) Integer sideInput,
+                OutputReceiver<KV<Integer, Integer>> o,
+                @Key Integer key) {
+              o.output(KV.of(key, sideInput));
+            }
+          };
+
+      final int numTestElements = 10;
+      final Instant now = new Instant(0);
+      TestStream.Builder<KV<Integer, Integer>> builder =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0));
+
+      for (int i = 0; i < numTestElements; i++) {
+        builder =
+            builder.addElements(
+                TimestampedValue.of(KV.of(i % 2, i), 
now.plus(Duration.millis(i * 1000))));
+        if ((i + 1) % 10 == 0) {
+          builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i + 
1) * 1000)));
+        }
+      }
+      List<KV<Integer, Integer>> expected =
+          IntStream.rangeClosed(0, 1)
+              .boxed()
+              .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i, 
2)).stream())
+              .collect(Collectors.toList());
+
+      PCollection<KV<Integer, Integer>> output =
+          pipeline
+              .apply(builder.advanceWatermarkToInfinity())
+              .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput));
+      PAssert.that(output).containsInAnyOrder(expected);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesSideInputs.class,
+      UsesSideInputsInTimer.class,
+      UsesTimersInParDo.class,
+      UsesTriggeredSideInputs.class
+    })
+    public void testSideInputNotReadyTimer() {
+      final String sideInputTag = "tag1";
+
+      // Create a side input that is delayed by 5 seconds using Thread.sleep
+      DoFn<KV<String, String>, String> delayFn =
+          new DoFn<KV<String, String>, String>() {
+            @ProcessElement
+            public void process(OutputReceiver<String> o) throws 
InterruptedException {
+              Thread.sleep(java.time.Duration.ofSeconds(15).toMillis());
+              o.output("side-value");
+            }
+          };
+
+      PCollectionView<String> sideInput =
+          pipeline
+              .apply("CreateSideSource", Create.of(KV.of("dummyKey", "")))
+              .apply("DelaySideInput", ParDo.of(delayFn))
+              .apply(View.asSingleton());
+
+      // Main input in global window
+      DoFn<KV<String, String>, String> fn =
+          new DoFn<KV<String, String>, String>() {
+            @TimerId("timer")
+            private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @StateId("dummy")
+            private final StateSpec<ValueState<Integer>> dummy = 
StateSpecs.value();
+
+            @ProcessElement
+            public void process(
+                @Timestamp Instant ts,
+                @TimerId("timer") Timer timer,
+                @DoFn.SideInput(sideInputTag) String sideInputValue,
+                OutputReceiver<String> o) {
+              // Set timer to fire at current timestamp + 1 millis
+              timer.offset(Duration.millis(1)).setRelative();
+              o.output(sideInputValue);
+            }
+
+            @OnTimer("timer")
+            public void onTimer(
+                OutputReceiver<String> o, @DoFn.SideInput(sideInputTag) String 
sideInputValue) {
+              o.output(sideInputValue);
+            }
+
+            @OnWindowExpiration
+            public void onWindowExpiration(
+                OutputReceiver<String> o, @DoFn.SideInput(sideInputTag) String 
sideInputValue) {
+              o.output(sideInputValue);
+            }
+          };
+
+      PCollection<String> output =
+          pipeline
+              .apply("CreateMainKV", Create.of(KV.of("key", "main-elem")))
+              .apply(ParDo.of(fn).withSideInput(sideInputTag, sideInput));
+
+      PAssert.that(output).containsInAnyOrder("side-value", "side-value", 
"side-value");
+      pipeline.run();
+    }
+
     @Test
     @Category({
       ValidatesRunner.class,
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index d92a84ea9ff..3e4675ab074 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -2362,6 +2362,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
         return currentWindow;
       }
 
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return stateAccessor.get(view, currentWindow);
+      }
+
       @Override
       public OutputBuilder<OutputT> builder(OutputT value) {
         return WindowedValues.<OutputT>builder()
@@ -2489,6 +2494,15 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       return (K) currentTimer.getUserKey();
     }
 
+    @Override
+    public @Nullable Object sideInput(String tagId) {
+      PCollectionView<?> view = sideInputMapping.get(tagId);
+      if (view == null) {
+        throw new IllegalArgumentException("Unknown side input: " + tagId);
+      }
+      return stateAccessor.get(view, currentWindow);
+    }
+
     @Override
     public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
       return context;
@@ -2649,6 +2663,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
         return currentWindow;
       }
 
+      @Override
+      public <T> T sideInput(PCollectionView<T> view) {
+        return stateAccessor.get(view, currentWindow);
+      }
+
       @Override
       public CausedByDrain causedByDrain() {
         return causedByDrain;
@@ -2800,6 +2819,15 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       return currentTimer.getFireTimestamp();
     }
 
+    @Override
+    public @Nullable Object sideInput(String tagId) {
+      PCollectionView<?> view = sideInputMapping.get(tagId);
+      if (view == null) {
+        throw new IllegalArgumentException("Unknown side input: " + tagId);
+      }
+      return stateAccessor.get(view, currentWindow);
+    }
+
     @Override
     public K key() {
       return (K) currentTimer.getUserKey();


Reply via email to