This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7095c880f34 [Dataflow Streaming] Mark support for windmill state tag 
encoding v2 (#37683)
7095c880f34 is described below

commit 7095c880f34469ffc07fe483d11d3d3800265062
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Feb 24 00:55:53 2026 -0800

    [Dataflow Streaming] Mark support for windmill state tag encoding v2 
(#37683)
    
    * [Dataflow Streaming] Mark support for windmill state tag encoding v2
    
    The added experiment informs backend of the support and make jobs having
    this change eligible for state tag encoding v2.
    
    Post submit tests with v2 encoding passes and internal test suites pass
    with encoding v2.
---
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 11 +++++++++++
 .../org/apache/beam/runners/dataflow/DataflowRunnerTest.java  |  5 +++--
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d826c3b2a38..9d963af0ecb 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1299,6 +1299,17 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     if (shouldActAsStreaming(pipeline)) {
       options.setStreaming(true);
 
+      {
+        List<String> experiments =
+            options.getExperiments() == null
+                ? new ArrayList<>()
+                : new ArrayList<>(options.getExperiments());
+        // Experiment marking that the harness supports tag encoding v2
+        // Backend will enable tag encoding v2 only if the harness supports it.
+        experiments.add("streaming_engine_state_tag_encoding_v2_supported");
+        options.setExperiments(ImmutableList.copyOf(experiments));
+      }
+
       if (useUnifiedWorker(options)) {
         options.setEnableStreamingEngine(true);
       }
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index ee5a7e1d26c..8c33123be6d 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.endsWith;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasEntry;
 import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
 import static org.hamcrest.Matchers.hasKey;
 import static org.hamcrest.Matchers.hasProperty;
 import static org.hamcrest.Matchers.instanceOf;
@@ -1791,7 +1792,7 @@ public class DataflowRunnerTest implements Serializable {
       assertFalse(options.isEnableStreamingEngine());
       assertThat(
           options.getExperiments(),
-          containsInAnyOrder(
+          hasItems(
               "beam_fn_api", "use_runner_v2", "use_unified_worker", 
"use_portable_job_submission"));
     }
 
@@ -1807,7 +1808,7 @@ public class DataflowRunnerTest implements Serializable {
       assertTrue(options.isEnableStreamingEngine());
       assertThat(
           options.getExperiments(),
-          containsInAnyOrder(
+          hasItems(
               "beam_fn_api",
               "use_runner_v2",
               "use_unified_worker",

Reply via email to