This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 2209bd60122 Adds bundleFinalizer support to Dataflow non-portable 
worker. (#37723)
2209bd60122 is described below

commit 2209bd6012287dadbf18d0e5fd521de1d7a2739d
Author: Andrew Crites <[email protected]>
AuthorDate: Fri Mar 20 01:49:42 2026 -0700

    Adds bundleFinalizer support to Dataflow non-portable worker. (#37723)
    
    * Adds bundleFinalizer support to non-portable worker.
    * Removes check preventing stateful DoFn's with bundle finalizers from 
running on Dataflow streaming non-portable worker when using Streaming Engine
---
 ...m_PostCommit_Java_ValidatesRunner_Dataflow.json |   4 +-
 ...it_Java_ValidatesRunner_Dataflow_Streaming.json |   2 +-
 .../apache/beam/runners/core/SimpleDoFnRunner.java |  16 +-
 runners/google-cloud-dataflow-java/build.gradle    |   7 +-
 .../beam/runners/dataflow/DataflowRunner.java      |   6 +-
 .../worker/SplittableProcessFnFactory.java         |   5 +-
 .../worker/StreamingModeExecutionContext.java      |  74 +++++++--
 .../client/grpc/GetWorkResponseChunkAssembler.java |   9 +-
 .../work/processing/StreamingCommitFinalizer.java  | 178 +++++++++++++++++----
 .../work/processing/StreamingWorkScheduler.java    |   1 +
 .../runners/dataflow/worker/SimpleParDoFnTest.java | 111 ++++++++++++-
 .../processing/StreamingCommitFinalizerTest.java   | 177 ++++++++++++++++++++
 .../worker/windmill/src/main/proto/windmill.proto  |  17 +-
 13 files changed, 544 insertions(+), 63 deletions(-)

diff --git 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
index a89f7adb4ce..e9d869cc508 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run!",
-  "modification": 3,
+  "modification": 1,
 }
- 
+
diff --git 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
index e623d3373a9..0c41d2bcf2f 100644
--- 
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
+++ 
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
@@ -1,4 +1,4 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run!",
-  "modification":  1,
+  "modification":  6,
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 74f5a4d0900..db24215e32c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -311,9 +311,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     public String getErrorContext() {
       return "SimpleDoFnRunner/StartBundle";
     }
+
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      return stepContext.bundleFinalizer();
+    }
   }
 
-  /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle 
@StartBundle}. */
+  /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.FinishBundle 
@FinishBundle}. */
   private class DoFnFinishBundleArgumentProvider
       extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
     /** A concrete implementation of {@link DoFn.FinishBundleContext}. */
@@ -356,6 +361,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     public String getErrorContext() {
       return "SimpleDoFnRunner/FinishBundle";
     }
+
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      return stepContext.bundleFinalizer();
+    }
   }
 
   /**
@@ -1030,7 +1040,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     @Override
     public BundleFinalizer bundleFinalizer() {
       throw new UnsupportedOperationException(
-          "Bundle finalization is not supported in non-portable pipelines.");
+          "Bundle finalization is not supported in OnTimer calls.");
     }
   }
 
@@ -1289,7 +1299,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
     @Override
     public BundleFinalizer bundleFinalizer() {
       throw new UnsupportedOperationException(
-          "Bundle finalization is not supported in non-portable pipelines.");
+          "Bundle finalization is not supported in OnWindowExpiration calls.");
     }
   }
 
diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 49de59fac32..652c72c323e 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [
   'org.apache.beam.sdk.testing.UsesGaugeMetrics',
   'org.apache.beam.sdk.testing.UsesTestStream',
   'org.apache.beam.sdk.testing.UsesMetricsPusher',
-  'org.apache.beam.sdk.testing.UsesBundleFinalizer',
   'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of 
now does not support returning back BoundedTrie in metric result.
 ]
 
@@ -456,7 +455,9 @@ task validatesRunner {
       
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
       
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
       
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
-      ]
+    ],
+    // Batch legacy worker does not support bundle finalization.
+    excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
   ))
 }
 
@@ -490,6 +491,8 @@ task validatesRunnerStreaming {
   description "Validates Dataflow runner forcing streaming mode"
   
dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig 
+ [
     name: 'validatesRunnerLegacyWorkerTestStreaming',
+    // Streaming appliance currently fails bundle finalizer tests.
+    excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [ 
'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
   ]))
 }
 
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9d963af0ecb..8bda9c20763 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2737,11 +2737,11 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
               DataflowRunner.class.getSimpleName()));
     }
     boolean isUnifiedWorker = useUnifiedWorker(options);
-    if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker) {
+    if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker && 
!streaming) {
       throw new UnsupportedOperationException(
           String.format(
-              "%s does not currently support %s when not using unified worker 
because it uses "
-                  + "BundleFinalizers in its implementation. Set the 
`--experiments=use_runner_v2` "
+              "%s does not currently support %s in batch mode when not using 
unified worker because it "
+                  + "uses BundleFinalizers in its implementation. Set the 
`--experiments=use_runner_v2` "
                   + "option to use this DoFn.",
               DataflowRunner.class.getSimpleName(), 
fn.getClass().getSimpleName()));
     }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
index 4473dff8e94..93c288fea9e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
@@ -156,10 +156,7 @@ class SplittableProcessFnFactory {
               // in the event of a crash.
               10000,
               Duration.standardSeconds(10),
-              () -> {
-                throw new UnsupportedOperationException(
-                    "BundleFinalizer unsupported by non-portable Dataflow.");
-              }));
+              stepContext::bundleFinalizer));
       DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT> 
simpleRunner =
           new SimpleDoFnRunner<>(
               options,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index b4568877143..f75d452b211 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -35,6 +35,7 @@ import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.StateInternals;
 import org.apache.beam.runners.core.StateNamespace;
@@ -73,6 +74,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.metrics.MetricsContainer;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.ByteStringOutputStream;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -82,8 +84,10 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -446,11 +450,27 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     }
   }
 
-  public Map<Long, Runnable> flushState() {
-    Map<Long, Runnable> callbacks = new HashMap<>();
+  public Map<Long, Pair<Instant, Runnable>> flushState() {
+    Map<Long, Pair<Instant, Runnable>> callbacks = new HashMap<>();
 
     for (StepContext stepContext : getAllStepContexts()) {
       stepContext.flushState();
+      for (Pair<Instant, BundleFinalizer.Callback> bundleFinalizer :
+          stepContext.flushBundleFinalizerCallbacks()) {
+        long id = ThreadLocalRandom.current().nextLong();
+        callbacks.put(
+            id,
+            Pair.of(
+                bundleFinalizer.getLeft(),
+                () -> {
+                  try {
+                    bundleFinalizer.getRight().onBundleSuccess();
+                  } catch (Exception e) {
+                    throw new RuntimeException("Exception while running bundle 
finalizer", e);
+                  }
+                }));
+        outputBuilder.addFinalizeIds(id);
+      }
     }
 
     if (activeReader != null) {
@@ -462,13 +482,15 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       sourceStateBuilder.addFinalizeIds(id);
       callbacks.put(
           id,
-          () -> {
-            try {
-              checkpointMark.finalizeCheckpoint();
-            } catch (IOException e) {
-              throw new RuntimeException("Exception while finalizing 
checkpoint", e);
-            }
-          });
+          Pair.of(
+              Instant.now().plus(Duration.standardMinutes(5)),
+              () -> {
+                try {
+                  checkpointMark.finalizeCheckpoint();
+                } catch (IOException e) {
+                  throw new RuntimeException("Exception while finalizing 
checkpoint", e);
+                }
+              }));
 
       @SuppressWarnings("unchecked")
       Coder<UnboundedSource.CheckpointMark> checkpointCoder =
@@ -699,6 +721,11 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     public DataflowStepContext namespacedToUser() {
       return this;
     }
+
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      return wrapped.bundleFinalizer();
+    }
   }
 
   /** A {@link SideInputReader} that fetches side inputs from the streaming 
worker's cache. */
@@ -771,6 +798,7 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     // A list of timer keys that were modified by user processing earlier in 
this bundle. This
     // serves a tombstone, so that we know not to fire any bundle timers that 
were modified.
     private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = 
null;
+    private final WindmillBundleFinalizer bundleFinalizer = new 
WindmillBundleFinalizer();
 
     public StepContext(DataflowOperationContext operationContext) {
       super(operationContext.nameContext());
@@ -1043,9 +1071,37 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
       return checkNotNull(systemTimerInternals);
     }
 
+    @Override
+    public BundleFinalizer bundleFinalizer() {
+      return bundleFinalizer;
+    }
+
     public TimerInternals userTimerInternals() {
       ensureStateful("Tried to access user timers");
       return checkNotNull(userTimerInternals);
     }
+
+    public ImmutableList<Pair<Instant, BundleFinalizer.Callback>> 
flushBundleFinalizerCallbacks() {
+      return bundleFinalizer.flushCallbacks();
+    }
+  }
+
+  private static class WindmillBundleFinalizer implements BundleFinalizer {
+    private ImmutableList.Builder<Pair<Instant, Callback>> callbacks = 
ImmutableList.builder();
+
+    private WindmillBundleFinalizer() {}
+
+    private ImmutableList<Pair<Instant, Callback>> flushCallbacks() {
+      ImmutableList<Pair<Instant, Callback>> flushedCallbacks = 
callbacks.build();
+      if (!Iterables.isEmpty(flushedCallbacks)) {
+        callbacks = ImmutableList.builder();
+      }
+      return flushedCallbacks;
+    }
+
+    @Override
+    public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
+      callbacks.add(Pair.of(callbackExpiry, callback));
+    }
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
index 3608bd1ccac..4afee8a70df 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
@@ -54,6 +54,7 @@ final class GetWorkResponseChunkAssembler {
   private final WorkItem.Builder workItemBuilder; // Reused to reduce GC 
overhead.
   private ByteString data;
   private long bufferedSize;
+  private final List<Long> appliedFinalizeIds;
 
   GetWorkResponseChunkAssembler() {
     workTimingInfosTracker = new 
GetWorkTimingInfosTracker(System::currentTimeMillis);
@@ -61,10 +62,11 @@ final class GetWorkResponseChunkAssembler {
     bufferedSize = 0;
     metadata = null;
     workItemBuilder = WorkItem.newBuilder();
+    appliedFinalizeIds = new ArrayList<>();
   }
 
   /**
-   * Appends the response chunk bytes to the {@link #data }byte buffer. Return 
the assembled
+   * Appends the response chunk bytes to the {@link #data} byte buffer. Return 
the assembled
    * WorkItem if all response chunks for a WorkItem have been received.
    */
   List<AssembledWorkItem> append(Windmill.StreamingGetWorkResponseChunk chunk) 
{
@@ -72,6 +74,7 @@ final class GetWorkResponseChunkAssembler {
       metadata = ComputationMetadata.fromProto(chunk.getComputationMetadata());
     }
     
workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList());
+    appliedFinalizeIds.addAll(chunk.getAppliedFinalizeIdsList());
 
     List<AssembledWorkItem> response = new ArrayList<>();
     for (int i = 0; i < chunk.getSerializedWorkItemList().size(); i++) {
@@ -90,13 +93,14 @@ final class GetWorkResponseChunkAssembler {
   }
 
   /**
-   * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ it's 
metadata. Resets the
+   * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ its 
metadata. Resets the
    * data byte string and tracking metadata afterwards, whether the {@link 
WorkItem} deserialization
    * was successful or not.
    */
   private Optional<AssembledWorkItem> flushToWorkItem() {
     try {
       workItemBuilder.mergeFrom(data);
+      workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds);
       return Optional.of(
           AssembledWorkItem.create(
               workItemBuilder.build(),
@@ -110,6 +114,7 @@ final class GetWorkResponseChunkAssembler {
       workTimingInfosTracker.reset();
       data = ByteString.EMPTY;
       bufferedSize = 0;
+      appliedFinalizeIds.clear();
     }
 
     return Optional.empty();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
index 7116aed3a2b..4266f11f50c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
@@ -17,14 +17,28 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
 
-import java.time.Duration;
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
 import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,28 +46,110 @@ import org.slf4j.LoggerFactory;
 @Internal
 final class StreamingCommitFinalizer {
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
-  private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY = 
Duration.ofMinutes(5L);
-  private final Cache<Long, Runnable> commitFinalizerCache;
+
+  /** A {@link Runnable} and expiry time pair. */
+  @AutoValue
+  public abstract static class FinalizationInfo {
+    public abstract long getId();
+
+    public abstract Instant getExpiryTime();
+
+    public abstract Runnable getCallback();
+
+    public static FinalizationInfo create(Long id, Instant expiryTime, 
Runnable callback) {
+      return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id, 
expiryTime, callback);
+    }
+  }
+
+  private final ReentrantLock lock = new ReentrantLock();
+  private final Condition queueMinChanged = lock.newCondition();
+
+  @GuardedBy("lock")
+  private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks = 
new HashMap<>();
+
+  @GuardedBy("lock")
+  private final PriorityQueue<FinalizationInfo> cleanUpQueue =
+      new PriorityQueue<>(11, 
Comparator.comparing(FinalizationInfo::getExpiryTime));
+
+  @GuardedBy("lock")
+  private boolean cleanUpThreadStarted = false;
+
   private final BoundedQueueExecutor finalizationExecutor;
 
-  private StreamingCommitFinalizer(
-      Cache<Long, Runnable> commitFinalizerCache, BoundedQueueExecutor 
finalizationExecutor) {
-    this.commitFinalizerCache = commitFinalizerCache;
-    this.finalizationExecutor = finalizationExecutor;
+  private StreamingCommitFinalizer(BoundedQueueExecutor 
finalizationCleanupExecutor) {
+    finalizationExecutor = finalizationCleanupExecutor;
+  }
+
+  private void cleanupThreadBody() {
+    lock.lock();
+    try {
+      while (true) {
+        final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
+        if (minValue == null) {
+          // Wait for an element to be added and loop to re-examine the min.
+          queueMinChanged.await();
+          continue;
+        }
+
+        Instant now = Instant.now();
+        Duration timeDifference = new Duration(now, minValue.getExpiryTime());
+        if (timeDifference.getMillis() < 0
+            || (queueMinChanged.await(timeDifference.getMillis(), 
TimeUnit.MILLISECONDS)
+                && cleanUpQueue.peek() == minValue)) {
+          // The minimum element has an expiry time before now, either because 
it had elapsed when
+          // we pulled it or because we awaited it, and it is still the 
minimum.
+          checkState(minValue == cleanUpQueue.poll());
+          checkState(commitFinalizationCallbacks.remove(minValue.getId()) == 
minValue);
+        }
+      }
+    } catch (InterruptedException e) {
+      // We're being shutdown.
+    } finally {
+      lock.unlock();
+    }
   }
 
   static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
-    return new StreamingCommitFinalizer(
-        
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
-        workExecutor);
+    return new StreamingCommitFinalizer(workExecutor);
   }
 
   /**
    * Stores a map of user worker generated finalization ids and callbacks to 
execute once a commit
    * has been successfully committed to the backing state store.
    */
-  void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
-    commitFinalizerCache.putAll(commitCallbacks);
+  public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>> 
callbacks) {
+    for (Map.Entry<Long, Pair<Instant, Runnable>> entry : 
callbacks.entrySet()) {
+      Long finalizeId = entry.getKey();
+      final FinalizationInfo info =
+          FinalizationInfo.create(
+              finalizeId, entry.getValue().getLeft(), 
entry.getValue().getRight());
+
+      lock.lock();
+      try {
+        FinalizationInfo existingInfo = 
commitFinalizationCallbacks.put(finalizeId, info);
+        if (existingInfo != null) {
+          throw new IllegalStateException(
+              "Expected to not have any past callbacks for bundle "
+                  + finalizeId
+                  + " but had "
+                  + existingInfo);
+        }
+        if (!cleanUpThreadStarted) {
+          // Start the cleanup thread lazily for pipelines that don't use 
finalization callbacks
+          // and some tests.
+          cleanUpThreadStarted = true;
+          finalizationExecutor.execute(this::cleanupThreadBody, 0);
+        }
+        cleanUpQueue.add(info);
+        @SuppressWarnings("ReferenceEquality")
+        boolean newMin = cleanUpQueue.peek() == info;
+        if (newMin) {
+          queueMinChanged.signal();
+        }
+      } finally {
+        lock.unlock();
+      }
+    }
   }
 
   /**
@@ -61,27 +157,41 @@ final class StreamingCommitFinalizer {
    * successfully persisted in the backing state store. If the commitCallback 
for the finalizationId
    * is still cached it is invoked.
    */
-  void finalizeCommits(Iterable<Long> finalizeIds) {
-    for (long finalizeId : finalizeIds) {
-      @Nullable Runnable finalizeCommit = 
commitFinalizerCache.getIfPresent(finalizeId);
-      // NOTE: It is possible the same callback id may be removed twice if
-      // windmill restarts.
-      // TODO: It is also possible for an earlier finalized id to be lost.
-      // We should automatically discard all older callbacks for the same 
computation and key.
-      if (finalizeCommit != null) {
-        commitFinalizerCache.invalidate(finalizeId);
-        finalizationExecutor.forceExecute(
-            () -> {
-              try {
-                finalizeCommit.run();
-              } catch (OutOfMemoryError oom) {
-                throw oom;
-              } catch (Throwable t) {
-                LOG.error("Source checkpoint finalization failed:", t);
-              }
-            },
-            0);
+  public void finalizeCommits(Iterable<Long> finalizeIds) {
+    if (Iterables.isEmpty(finalizeIds)) {
+      return;
+    }
+    List<Runnable> callbacksToExecute = new ArrayList<>();
+    lock.lock();
+    try {
+      for (long finalizeId : finalizeIds) {
+        @Nullable FinalizationInfo info = 
commitFinalizationCallbacks.remove(finalizeId);
+        if (info != null) {
+          checkState(cleanUpQueue.remove(info));
+          callbacksToExecute.add(info.getCallback());
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+    for (Runnable callback : callbacksToExecute) {
+      try {
+        finalizationExecutor.forceExecute(callback, 0);
+      } catch (OutOfMemoryError oom) {
+        throw oom;
+      } catch (Throwable t) {
+        LOG.error("Commit finalization failed:", t);
       }
     }
   }
+
+  @VisibleForTesting
+  int cleanupQueueSize() {
+    lock.lock();
+    try {
+      return cleanUpQueue.size();
+    } finally {
+      lock.unlock();
+    }
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index bb936831d6e..b0f3a289902 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -246,6 +246,7 @@ public class StreamingWorkScheduler {
     // Before any processing starts, call any pending OnCommit callbacks.  
Nothing that requires
     // cleanup should be done before this, since we might exit early here.
     
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
+    commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList());
     if (workItem.getSourceState().getOnlyFinalize()) {
       Windmill.WorkItemCommitRequest.Builder outputBuilder = 
initializeOutputBuilder(key, workItem);
       
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
index 9e45425562a..9c9f5386f44 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
@@ -36,20 +36,24 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.core.NullSideInputReader;
 import org.apache.beam.runners.core.SideInputReader;
 import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import 
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
 import 
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution;
 import org.apache.beam.runners.dataflow.worker.counters.CounterName;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import 
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -62,16 +66,20 @@ import org.apache.beam.sdk.values.WindowingStrategy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
 
 /** Tests for {@link SimpleParDoFn}. */
 @RunWith(JUnit4.class)
 public class SimpleParDoFnTest {
+
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   private PipelineOptions options;
@@ -95,6 +103,7 @@ public class SimpleParDoFnTest {
 
   // TODO: Replace TestDoFn usages with a mock DoFn to reduce boilerplate.
   static class TestDoFn extends DoFn<Integer, String> {
+
     enum State {
       UNSTARTED,
       SET_UP,
@@ -156,6 +165,7 @@ public class SimpleParDoFnTest {
   }
 
   static class TestErrorDoFn extends DoFn<Integer, String> {
+
     // Used to test nested stack traces.
     private void nestedFunctionBeta(String s) {
       throw new RuntimeException(s);
@@ -182,6 +192,7 @@ public class SimpleParDoFnTest {
   }
 
   static class TestReceiver implements Receiver {
+
     List<Object> receivedElems = new ArrayList<>();
 
     @Override
@@ -563,10 +574,10 @@ public class SimpleParDoFnTest {
    * @param inputData Input elements to process. For each element X, the DoFn 
will output a string
    *     repeated X times.
    * @return Delta counter updates extracted after execution.
-   * @throws Exception
    */
   private List<CounterUpdate> executeParDoFnCounterTest(int... inputData) 
throws Exception {
     class RepeaterDoFn extends DoFn<Integer, String> {
+
       /** Takes as input the number of times to output a message. */
       @ProcessElement
       public void processElement(ProcessContext c) {
@@ -616,6 +627,7 @@ public class SimpleParDoFnTest {
    * conversion according to the {@link PCollectionView} and projection to a 
particular window.
    */
   private static class EmptySideInputReader implements SideInputReader {
+
     private EmptySideInputReader() {}
 
     @Override
@@ -633,4 +645,101 @@ public class SimpleParDoFnTest {
       throw new IllegalArgumentException("calling getSideInput() with unknown 
view");
     }
   }
+
+  @Test
+  public void testBundleFinalizer() throws Exception {
+    WithBundleFinalizerDoFn.startBundleCount.set(0);
+    WithBundleFinalizerDoFn.processElementCount.set(0);
+    WithBundleFinalizerDoFn.finishBundleCount.set(0);
+    DoFnInfo<Long, String> fnInfo =
+        DoFnInfo.forFn(
+            new WithBundleFinalizerDoFn(),
+            WindowingStrategy.globalDefault(),
+            null /* side input views */,
+            VarLongCoder.of(),
+            MAIN_OUTPUT,
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap());
+    DataflowExecutionContext.DataflowStepContext userStepContext =
+        Mockito.mock(
+            DataflowExecutionContext.DataflowStepContext.class,
+            invocation -> {
+              if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+                return new BundleFinalizer() {
+                  @Override
+                  public void afterBundleCommit(Instant expiry, Callback 
callback) {
+                    try {
+                      callback.onBundleSuccess();
+                    } catch (Exception e) {
+                      throw new RuntimeException(e);
+                    }
+                  }
+                };
+              }
+              return invocation.getMethod().invoke(stepContext, 
invocation.getArguments());
+            });
+
+    DataflowStepContext stepContextWithBundleFinalizer =
+        Mockito.mock(
+            DataflowStepContext.class,
+            invocation -> {
+              if (invocation.getMethod().getName().equals("namespacedToUser")) 
{
+                return userStepContext;
+              }
+              return invocation.getMethod().invoke(stepContext, 
invocation.getArguments());
+            });
+
+    ParDoFn parDoFn =
+        new SimpleParDoFn<>(
+            options,
+            DoFnInstanceManagers.singleInstance(fnInfo),
+            new EmptySideInputReader(),
+            MAIN_OUTPUT,
+            ImmutableMap.of(MAIN_OUTPUT, 0),
+            stepContextWithBundleFinalizer,
+            operationContext,
+            DoFnSchemaInformation.create(),
+            Collections.emptyMap(),
+            SimpleDoFnRunnerFactory.INSTANCE);
+
+    parDoFn.startBundle(new TestReceiver());
+
+    // Process a few elements
+    for (int i = 0; i < 5; i++) {
+      parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L));
+    }
+
+    parDoFn.finishBundle();
+
+    assertThat(WithBundleFinalizerDoFn.startBundleCount.get(), equalTo(1));
+    assertThat(WithBundleFinalizerDoFn.processElementCount.get(), equalTo(5));
+    assertThat(WithBundleFinalizerDoFn.finishBundleCount.get(), equalTo(1));
+  }
+
+  static class WithBundleFinalizerDoFn extends DoFn<Long, String> {
+    private static final AtomicInteger startBundleCount = new AtomicInteger(0);
+    private static final AtomicInteger processElementCount = new 
AtomicInteger(0);
+    private static final AtomicInteger finishBundleCount = new 
AtomicInteger(0);
+
+    @StartBundle
+    public void startBundle(StartBundleContext context, BundleFinalizer 
bundleFinalizer) {
+      bundleFinalizer.afterBundleCommit(
+          Instant.now().plus(Duration.standardMinutes(5)),
+          () -> startBundleCount.incrementAndGet());
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c, BundleFinalizer 
bundleFinalizer) {
+      bundleFinalizer.afterBundleCommit(
+          Instant.now().plus(Duration.standardMinutes(5)),
+          () -> processElementCount.incrementAndGet());
+    }
+
+    @FinishBundle
+    public void finishBundle(FinishBundleContext context, BundleFinalizer 
bundleFinalizer) {
+      bundleFinalizer.afterBundleCommit(
+          Instant.now().plus(Duration.standardMinutes(5)),
+          () -> finishBundleCount.incrementAndGet());
+    }
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
new file mode 100644
index 00000000000..7361d0be2cd
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+  private StreamingCommitFinalizer finalizer;
+  private BoundedQueueExecutor executor;
+
+  @Before
+  public void setUp() {
+    executor =
+        new BoundedQueueExecutor(
+            10,
+            60,
+            TimeUnit.SECONDS,
+            10,
+            10000000,
+            new ThreadFactoryBuilder()
+                .setNameFormat("FinalizationCallback-%d")
+                .setDaemon(true)
+                .build(),
+            /*useFairMonitor=*/ false);
+    finalizer = StreamingCommitFinalizer.create(executor);
+  }
+
+  @Test
+  public void testCreateAndInit() {
+    assertEquals(0, finalizer.cleanupQueueSize());
+  }
+
+  @Test
+  public void testCacheCommitFinalizer() {
+    Runnable callback = mock(Runnable.class);
+    finalizer.cacheCommitFinalizers(
+        ImmutableMap.of(1L, 
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+    assertEquals(1, finalizer.cleanupQueueSize());
+    verify(callback, never()).run();
+  }
+
+  @Test
+  public void testThrowErrorOnDuplicateIds() {
+    Runnable callback1 = mock(Runnable.class);
+    Instant expiry = Instant.now().plus(Duration.standardHours(1));
+    finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry, 
callback1)));
+
+    Runnable callback2 = mock(Runnable.class);
+    Map<Long, Pair<Instant, Runnable>> duplicateCallback =
+        ImmutableMap.of(1L, Pair.of(expiry, callback2));
+    assertThrows(
+        IllegalStateException.class, () -> 
finalizer.cacheCommitFinalizers(duplicateCallback));
+  }
+
+  @Test
+  public void testFinalizeCommits() throws Exception {
+    CountDownLatch callbackExecuted = new CountDownLatch(1);
+    finalizer.cacheCommitFinalizers(
+        ImmutableMap.of(
+            1L,
+            Pair.of(
+                Instant.now().plus(Duration.standardHours(1)),
+                () -> callbackExecuted.countDown())));
+    finalizer.finalizeCommits(Collections.singletonList(1L));
+    assertTrue(callbackExecuted.await(30, TimeUnit.SECONDS));
+    assertEquals(0, finalizer.cleanupQueueSize());
+  }
+
+  @Test
+  public void testMultipleCommits() throws Exception {
+    CountDownLatch callback1Executed = new CountDownLatch(1);
+    CountDownLatch callback2Executed = new CountDownLatch(1);
+    CountDownLatch callback3Executed = new CountDownLatch(1);
+
+    Instant expiryTime = Instant.now().plus(Duration.standardHours(1));
+    finalizer.cacheCommitFinalizers(
+        ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+            .put(1L, Pair.of(expiryTime, () -> callback1Executed.countDown()))
+            .put(2L, Pair.of(expiryTime, () -> callback2Executed.countDown()))
+            .put(3L, Pair.of(expiryTime, () -> callback3Executed.countDown()))
+            .build());
+    // Finalize commits one at a time (in different order from added).
+    finalizer.finalizeCommits(Collections.singletonList(2L));
+    assertTrue(callback2Executed.await(30, TimeUnit.SECONDS));
+
+    finalizer.finalizeCommits(Collections.singletonList(3L));
+    assertTrue(callback3Executed.await(30, TimeUnit.SECONDS));
+
+    finalizer.finalizeCommits(Collections.singletonList(1L));
+    assertTrue(callback1Executed.await(30, TimeUnit.SECONDS));
+
+    assertEquals(0, finalizer.cleanupQueueSize());
+  }
+
+  @Test
+  public void testIgnoresUnknownIds() throws Exception {
+    Runnable callback = mock(Runnable.class);
+    finalizer.cacheCommitFinalizers(
+        ImmutableMap.of(1L, 
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+    finalizer.finalizeCommits(Collections.singletonList(2L));
+    assertEquals(1, executor.elementsOutstanding());
+    verify(callback, never()).run();
+    assertEquals(1, finalizer.cleanupQueueSize());
+  }
+
+  @Test
+  public void testCleanupOnExpiration() throws Exception {
+    CountDownLatch callback1Executed = new CountDownLatch(1);
+    finalizer.cacheCommitFinalizers(
+        ImmutableMap.of(
+            1L,
+            Pair.of(
+                Instant.now().plus(Duration.standardHours(1)),
+                () -> callback1Executed.countDown())));
+    assertEquals(1, finalizer.cleanupQueueSize());
+
+    Runnable callback2 = mock(Runnable.class);
+    Runnable callback3 = mock(Runnable.class);
+    Instant shortTimeout = Instant.now().plus(Duration.millis(100));
+    finalizer.cacheCommitFinalizers(
+        ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+            .put(2L, Pair.of(shortTimeout, callback2))
+            .put(3L, Pair.of(shortTimeout, callback3))
+            .build());
+
+    while (finalizer.cleanupQueueSize() > 1) {
+      // Wait until the two 100ms timeouts expire.
+      Thread.sleep(200);
+    }
+    assertEquals(1, executor.elementsOutstanding());
+    finalizer.finalizeCommits(ImmutableList.of(2L, 3L));
+    verify(callback2, never()).run();
+    verify(callback3, never()).run();
+
+    finalizer.finalizeCommits(Collections.singletonList(1L));
+    assertTrue(callback1Executed.await(30, TimeUnit.SECONDS));
+    assertEquals(0, finalizer.cleanupQueueSize());
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index 0b179c4d338..ca650907ffe 100644
--- 
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++ 
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -447,6 +447,10 @@ message WorkItem {
   // elements mapped to a single key to impact pipeline performance. When
   // present, this field includes metadata associated with any hot key.
   optional HotKeyInfo hot_key_info = 11;
+
+  repeated int64 applied_finalize_ids = 16;
+
+  reserved 12, 13, 14, 15;
 }
 
 message ComputationWorkItems {
@@ -653,10 +657,16 @@ message WorkItemCommitRequest {
   // Collected work item processing state durations.
   repeated LatencyAttribution per_work_item_latency_attributions = 27;
 
+  // Ids that will be passed back as applied_finalize_ids in a subsequent
+  // GetWorkResponse once the state in this request has been persisted to disk
+  // successfully. This is best-effort; it is possible that state is persisted
+  // but the finalize ids are not sent to the worker.
+  repeated int64 finalize_ids = 19 [packed = true];
+
   // DEPRECATED
   repeated GlobalDataId global_data_id_requests = 9;
 
-  reserved 6, 19, 23;
+  reserved 6, 23;
 }
 
 message ComputationCommitWorkRequest {
@@ -791,11 +801,14 @@ message StreamingGetWorkResponseChunk {
   // from other stream_ids may be interleaved on the physical stream.
   optional fixed64 stream_id = 4;
 
+  // Finalize ids associated with successfully applied work from this worker
+  repeated int64 applied_finalize_ids = 6 [packed = true];
+
   // Timing infos for the work item. Windmill Dispatcher and user worker should
   // propagate critical event timings if the list is not empty.
   repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;
 
-  // reserved field 5
+  reserved 5, 7;
 }
 
 message ComputationWorkItemMetadata {


Reply via email to