This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 85c1b88ce5a [Dataflow Streaming] Create a separate option to control
commit threads with direct path. (#37848)
85c1b88ce5a is described below
commit 85c1b88ce5aa9fb00dd321c8b8ef8534f5f01959
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Mar 16 01:46:14 2026 -0700
[Dataflow Streaming] Create a separate option to control commit threads
with direct path. (#37848)
The number of commit threads will be per backend windmill worker.
---
.../runners/dataflow/options/DataflowStreamingPipelineOptions.java | 7 +++++++
.../beam/runners/dataflow/worker/StreamingDataflowWorker.java | 3 ++-
2 files changed, 9 insertions(+), 1 deletion(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 68e794f2d3b..9727048e47a 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -100,6 +100,13 @@ public interface DataflowStreamingPipelineOptions extends
PipelineOptions {
void setWindmillServiceCommitThreads(Integer value);
+ @Description(
+ "Number of commit threads per backend windmill worker in streaming
engine direct path mode.")
+ @Default.Integer(1)
+ Integer getWindmillServiceDirectPathCommitThreads();
+
+ void setWindmillServiceDirectPathCommitThreads(Integer value);
+
@Description(
"Frequency at which active work should be reported back to Windmill, in
millis. "
+ "The first refresh will occur after at least this much time has
passed since "
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 2f389f92019..172ca2b550c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -419,7 +419,8 @@ public final class StreamingDataflowWorker {
.setCommitByteSemaphore(maxCommitByteSemaphore)
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
.setOnCommitComplete(this::onCompleteCommit)
-
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+ .setNumCommitSenders(
+
Math.max(options.getWindmillServiceDirectPathCommitThreads(), 1))
.setCommitWorkStreamFactory(
() -> CloseableStream.create(commitWorkStream, () ->
{}))
.build(),