This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4aff88ea91a Revert "[Dataflow Streaming] Prevent commit threads from
sharing commit streams" (#37873)
4aff88ea91a is described below
commit 4aff88ea91a19776405e46254d9f93629f9329f2
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Mar 16 17:12:19 2026 -0700
Revert "[Dataflow Streaming] Prevent commit threads from sharing commit
streams" (#37873)
---
.../beam/runners/dataflow/worker/StreamingDataflowWorker.java | 9 +++------
1 file changed, 3 insertions(+), 6 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 9fb723e812f..172ca2b550c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -464,13 +464,10 @@ public final class StreamingDataflowWorker {
@SuppressWarnings("methodref.receiver.bound")
WorkCommitter workCommitter =
StreamingEngineWorkCommitter.builder()
- // Use a separate stream pool for each committer. This ensures the
commit
- // threads are fully isolated.
.setCommitWorkStreamFactory(
- () ->
- WindmillStreamPool.create(
- 1, COMMIT_STREAM_TIMEOUT,
windmillServer::commitWorkStream)
- .getCloseableStream())
+ WindmillStreamPool.create(
+ numCommitThreads, COMMIT_STREAM_TIMEOUT,
windmillServer::commitWorkStream)
+ ::getCloseableStream)
.setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
.setNumCommitSenders(numCommitThreads)
.setOnCommitComplete(this::onCompleteCommit)