This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3516b5d318f [Dataflow Streaming] Fix race in 
StreamingEngineWorkComitter during stream shutdown (#37104)
3516b5d318f is described below

commit 3516b5d318fa4c635c4dc7cc6ba0bc9a057dcdd1
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Dec 16 00:41:26 2025 -0800

    [Dataflow Streaming] Fix race in StreamingEngineWorkComitter during stream 
shutdown (#37104)
---
 .../commits/StreamingEngineWorkCommitter.java      | 24 ++++---
 .../commits/StreamingEngineWorkCommitterTest.java  | 74 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 9 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java
index 85fa1d67c6c..b68f53121b8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitter.java
@@ -45,6 +45,7 @@ import org.slf4j.LoggerFactory;
 @Internal
 @ThreadSafe
 public final class StreamingEngineWorkCommitter implements WorkCommitter {
+
   private static final Logger LOG = 
LoggerFactory.getLogger(StreamingEngineWorkCommitter.class);
   private static final int TARGET_COMMIT_BATCH_KEYS = 5;
   private static final String NO_BACKEND_WORKER_TOKEN = "";
@@ -99,19 +100,23 @@ public final class StreamingEngineWorkCommitter implements 
WorkCommitter {
 
   @Override
   public void commit(Commit commit) {
-    boolean isShutdown = !this.isRunning.get();
-    if (commit.work().isFailed() || isShutdown) {
-      if (isShutdown) {
-        LOG.debug(
-            "Trying to queue commit on shutdown, failing 
commit=[computationId={}, shardingKey={}, workId={} ].",
-            commit.computationId(),
-            commit.work().getShardedKey(),
-            commit.work().id());
-      }
+    if (commit.work().isFailed()) {
       failCommit(commit);
     } else {
       commitQueue.put(commit);
     }
+
+    // Do this check after adding to commitQueue, else commitQueue.put() can 
race with
+    // drainCommitQueue() in stop() and leave commits orphaned in the queue.
+    if (!this.isRunning.get()) {
+      LOG.debug(
+          "Trying to queue commit on shutdown, failing 
commit=[computationId={}, shardingKey={},"
+              + " workId={} ].",
+          commit.computationId(),
+          commit.work().getShardedKey(),
+          commit.work().id());
+      drainCommitQueue();
+    }
   }
 
   @Override
@@ -255,6 +260,7 @@ public final class StreamingEngineWorkCommitter implements 
WorkCommitter {
 
   @AutoBuilder
   public interface Builder {
+
     Builder setCommitWorkStreamFactory(
         Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory);
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
index b4f63fa7161..01197622c24 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
@@ -34,13 +34,21 @@ import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import org.apache.beam.runners.dataflow.worker.FakeWindmillServer;
 import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
 import org.apache.beam.runners.dataflow.worker.streaming.Watermarks;
+import org.apache.beam.runners.dataflow.worker.streaming.WeightedSemaphore;
 import org.apache.beam.runners.dataflow.worker.streaming.Work;
 import org.apache.beam.runners.dataflow.worker.streaming.WorkId;
 import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
@@ -67,6 +75,7 @@ import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
 public class StreamingEngineWorkCommitterTest {
+
   @Rule public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule();
   @Rule public ErrorCollector errorCollector = new ErrorCollector();
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
@@ -75,9 +84,17 @@ public class StreamingEngineWorkCommitterTest {
   private Supplier<CloseableStream<CommitWorkStream>> commitWorkStreamFactory;
 
   private static void waitForExpectedSetSize(Set<?> s, int expectedSize) {
+    long deadline = System.currentTimeMillis() + 100 * 1000; // 100 seconds
     while (s.size() < expectedSize) {
       try {
         Thread.sleep(10);
+        if (System.currentTimeMillis() > deadline) {
+          throw new RuntimeException(
+              "Timed out waiting for expected set size to be: "
+                  + expectedSize
+                  + " but was: "
+                  + s.size());
+        }
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
       }
@@ -400,4 +417,61 @@ public class StreamingEngineWorkCommitterTest {
 
     workCommitter.stop();
   }
+
+  @Test
+  public void testStop_drainsCommitQueue_concurrentCommit()
+      throws InterruptedException, ExecutionException, TimeoutException {
+    Set<CompleteCommit> completeCommits = Collections.newSetFromMap(new 
ConcurrentHashMap<>());
+    workCommitter =
+        StreamingEngineWorkCommitter.builder()
+            // Set the semaphore to only allow a single commit at a time.
+            // This creates a bottleneck on purpose to trigger race conditions 
during shutdown.
+            .setCommitByteSemaphore(WeightedSemaphore.create(1, (commit) -> 1))
+            .setCommitWorkStreamFactory(commitWorkStreamFactory)
+            .setOnCommitComplete(completeCommits::add)
+            .build();
+
+    int numThreads = 5;
+    ExecutorService producer = Executors.newFixedThreadPool(numThreads);
+    AtomicBoolean producing = new AtomicBoolean(true);
+    AtomicLong sentCommits = new AtomicLong(0);
+
+    workCommitter.start();
+
+    AtomicLong workToken = new AtomicLong(0);
+    List<Future<?>> futures = new ArrayList<>(numThreads);
+    for (int i = 0; i < numThreads; i++) {
+      futures.add(
+          producer.submit(
+              () -> {
+                while (producing.get()) {
+                  Work work = createMockWork(workToken.getAndIncrement());
+                  WorkItemCommitRequest commitRequest =
+                      WorkItemCommitRequest.newBuilder()
+                          .setKey(work.getWorkItem().getKey())
+                          .setShardingKey(work.getWorkItem().getShardingKey())
+                          .setWorkToken(work.getWorkItem().getWorkToken())
+                          .setCacheToken(work.getWorkItem().getCacheToken())
+                          .build();
+                  Commit commit =
+                      Commit.create(commitRequest, 
createComputationState("computationId"), work);
+                  workCommitter.commit(commit);
+                  sentCommits.incrementAndGet();
+                }
+              }));
+    }
+
+    // Let it run for a bit
+    Thread.sleep(100);
+
+    workCommitter.stop();
+    producing.set(false);
+    producer.shutdown();
+    assertTrue(producer.awaitTermination(10, TimeUnit.SECONDS));
+    for (Future<?> future : futures) {
+      future.get(10, TimeUnit.SECONDS);
+    }
+
+    waitForExpectedSetSize(completeCommits, sentCommits.intValue());
+  }
 }

Reply via email to