This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aff88ea91a Revert "[Dataflow Streaming] Prevent commit threads from 
sharing commit streams" (#37873)
4aff88ea91a is described below

commit 4aff88ea91a19776405e46254d9f93629f9329f2
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Mar 16 17:12:19 2026 -0700

    Revert "[Dataflow Streaming] Prevent commit threads from sharing commit 
streams" (#37873)
---
 .../beam/runners/dataflow/worker/StreamingDataflowWorker.java    | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 9fb723e812f..172ca2b550c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -464,13 +464,10 @@ public final class StreamingDataflowWorker {
     @SuppressWarnings("methodref.receiver.bound")
     WorkCommitter workCommitter =
         StreamingEngineWorkCommitter.builder()
-            // Use a separate stream pool for each committer. This ensures the 
commit
-            // threads are fully isolated.
             .setCommitWorkStreamFactory(
-                () ->
-                    WindmillStreamPool.create(
-                            1, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
-                        .getCloseableStream())
+                WindmillStreamPool.create(
+                        numCommitThreads, COMMIT_STREAM_TIMEOUT, 
windmillServer::commitWorkStream)
+                    ::getCloseableStream)
             .setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
             .setNumCommitSenders(numCommitThreads)
             .setOnCommitComplete(this::onCompleteCommit)

Reply via email to