This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7095c880f34 [Dataflow Streaming] Mark support for windmill state tag
encoding v2 (#37683)
7095c880f34 is described below
commit 7095c880f34469ffc07fe483d11d3d3800265062
Author: Arun Pandian <[email protected]>
AuthorDate: Tue Feb 24 00:55:53 2026 -0800
[Dataflow Streaming] Mark support for windmill state tag encoding v2
(#37683)
* [Dataflow Streaming] Mark support for windmill state tag encoding v2
The added experiment informs backend of the support and make jobs having
this change eligible for state tag encoding v2.
Post submit tests with v2 encoding passes and internal test suites pass
with encoding v2.
---
.../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 11 +++++++++++
.../org/apache/beam/runners/dataflow/DataflowRunnerTest.java | 5 +++--
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d826c3b2a38..9d963af0ecb 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -1299,6 +1299,17 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
if (shouldActAsStreaming(pipeline)) {
options.setStreaming(true);
+ {
+ List<String> experiments =
+ options.getExperiments() == null
+ ? new ArrayList<>()
+ : new ArrayList<>(options.getExperiments());
+ // Experiment marking that the harness supports tag encoding v2
+ // Backend will enable tag encoding v2 only if the harness supports it.
+ experiments.add("streaming_engine_state_tag_encoding_v2_supported");
+ options.setExperiments(ImmutableList.copyOf(experiments));
+ }
+
if (useUnifiedWorker(options)) {
options.setEnableStreamingEngine(true);
}
diff --git
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index ee5a7e1d26c..8c33123be6d 100644
---
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -26,6 +26,7 @@ import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.instanceOf;
@@ -1791,7 +1792,7 @@ public class DataflowRunnerTest implements Serializable {
assertFalse(options.isEnableStreamingEngine());
assertThat(
options.getExperiments(),
- containsInAnyOrder(
+ hasItems(
"beam_fn_api", "use_runner_v2", "use_unified_worker",
"use_portable_job_submission"));
}
@@ -1807,7 +1808,7 @@ public class DataflowRunnerTest implements Serializable {
assertTrue(options.isEnableStreamingEngine());
assertThat(
options.getExperiments(),
- containsInAnyOrder(
+ hasItems(
"beam_fn_api",
"use_runner_v2",
"use_unified_worker",