This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 85c1b88ce5a [Dataflow Streaming] Create a separate option to control 
commit threads with direct path. (#37848)
85c1b88ce5a is described below

commit 85c1b88ce5aa9fb00dd321c8b8ef8534f5f01959
Author: Arun Pandian <[email protected]>
AuthorDate: Mon Mar 16 01:46:14 2026 -0700

    [Dataflow Streaming] Create a separate option to control commit threads 
with direct path. (#37848)
    
    The number of commit threads will be per backend windmill worker.
---
 .../runners/dataflow/options/DataflowStreamingPipelineOptions.java | 7 +++++++
 .../beam/runners/dataflow/worker/StreamingDataflowWorker.java      | 3 ++-
 2 files changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
index 68e794f2d3b..9727048e47a 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java
@@ -100,6 +100,13 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setWindmillServiceCommitThreads(Integer value);
 
+  @Description(
+      "Number of commit threads per backend windmill worker in streaming 
engine direct path mode.")
+  @Default.Integer(1)
+  Integer getWindmillServiceDirectPathCommitThreads();
+
+  void setWindmillServiceDirectPathCommitThreads(Integer value);
+
   @Description(
       "Frequency at which active work should be reported back to Windmill, in 
millis. "
           + "The first refresh will occur after at least this much time has 
passed since "
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 2f389f92019..172ca2b550c 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -419,7 +419,8 @@ public final class StreamingDataflowWorker {
                     .setCommitByteSemaphore(maxCommitByteSemaphore)
                     
.setBackendWorkerToken(commitWorkStream.backendWorkerToken())
                     .setOnCommitComplete(this::onCompleteCommit)
-                    
.setNumCommitSenders(Math.max(options.getWindmillServiceCommitThreads(), 1))
+                    .setNumCommitSenders(
+                        
Math.max(options.getWindmillServiceDirectPathCommitThreads(), 1))
                     .setCommitWorkStreamFactory(
                         () -> CloseableStream.create(commitWorkStream, () -> 
{}))
                     .build(),

Reply via email to