This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ef3a2446bc Change KafkaIO to default to offset-based deduplication 
when redistribute is enabled for Dataflow java runner. (#36849)
8ef3a2446bc is described below

commit 8ef3a2446bc8251bd11e1a08580d5f9a9c2e31db
Author: Tom Stepp <[email protected]>
AuthorDate: Fri Nov 21 02:16:31 2025 -0800

    Change KafkaIO to default to offset-based deduplication when redistribute 
is enabled for Dataflow java runner. (#36849)
    
    * Add kafka read override to Dataflow java runner.
---
 runners/google-cloud-dataflow-java/build.gradle    |   2 +
 .../beam/runners/dataflow/DataflowRunner.java      |   4 +
 .../KafkaReadWithRedistributeOverride.java         |  75 ++++++++++++
 .../KafkaReadWithRedistributeOverrideTest.java     | 133 +++++++++++++++++++++
 4 files changed, 214 insertions(+)

diff --git a/runners/google-cloud-dataflow-java/build.gradle 
b/runners/google-cloud-dataflow-java/build.gradle
index 415132fa7d2..0961a385b21 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -129,6 +129,8 @@ dependencies {
   testImplementation library.java.google_cloud_dataflow_java_proto_library_all
   testImplementation library.java.jackson_dataformat_yaml
   testImplementation library.java.mockito_inline
+  testImplementation project(":sdks:java:io:kafka")
+  testImplementation library.java.kafka_clients
   validatesRunner project(path: ":sdks:java:core", configuration: "shadowTest")
   validatesRunner project(path: project.path, configuration: 
"testRuntimeMigration")
   validatesRunner library.java.hamcrest
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 7e23182042c..7d0a151b48b 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -659,6 +659,10 @@ public class DataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
 
       try {
         overridesBuilder.add(KafkaIO.Read.KAFKA_READ_OVERRIDE);
+        overridesBuilder.add(
+            PTransformOverride.of(
+                KafkaReadWithRedistributeOverride.matcher(),
+                new KafkaReadWithRedistributeOverride.Factory()));
       } catch (NoClassDefFoundError e) {
         // Do nothing. io-kafka is an optional dependency of 
runners-google-cloud-dataflow-java
         // and only needed when KafkaIO is used in the pipeline.
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
new file mode 100644
index 00000000000..89f0eef9b8c
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverride.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import java.util.Map;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.io.kafka.KafkaRecord;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.util.construction.ReplacementOutputs;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TupleTag;
+
+public final class KafkaReadWithRedistributeOverride {
+
+  private KafkaReadWithRedistributeOverride() {}
+
+  public static PTransformMatcher matcher() {
+    return new PTransformMatcher() {
+      @SuppressWarnings({
+        "PatternMatchingInstanceof" // For compiling on older Java versions.
+      })
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        if (application.getTransform() instanceof KafkaIO.Read) {
+          return ((KafkaIO.Read) application.getTransform()).isRedistributed();
+        }
+        return false;
+      }
+    };
+  }
+
+  /**
+   * {@link PTransformOverrideFactory} for {@link KafkaIO.Read} that enables 
{@code
+   * withOffsetDeduplication} when {@code withRedistribute} is enabled.
+   */
+  static class Factory<K, V>
+      implements PTransformOverrideFactory<
+          PBegin, PCollection<KafkaRecord<K, V>>, KafkaIO.Read<K, V>> {
+
+    @Override
+    public PTransformReplacement<PBegin, PCollection<KafkaRecord<K, V>>> 
getReplacementTransform(
+        AppliedPTransform<PBegin, PCollection<KafkaRecord<K, V>>, 
KafkaIO.Read<K, V>> transform) {
+      KafkaIO.Read<K, V> read = transform.getTransform();
+      if (read.getOffsetDeduplication() == null) {
+        return PTransformReplacement.of(
+            transform.getPipeline().begin(), 
read.withOffsetDeduplication(true));
+      }
+      return PTransformReplacement.of(transform.getPipeline().begin(), read);
+    }
+
+    @Override
+    public Map<PCollection<?>, ReplacementOutput> mapOutputs(
+        Map<TupleTag<?>, PCollection<?>> outputs, PCollection<KafkaRecord<K, 
V>> newOutput) {
+      return ReplacementOutputs.singleton(outputs, newOutput);
+    }
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
new file mode 100644
index 00000000000..05e5dd6a55d
--- /dev/null
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/KafkaReadWithRedistributeOverrideTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.runners.PTransformOverride;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class KafkaReadWithRedistributeOverrideTest implements Serializable {
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  @Test
+  public void testOverrideAppliedWhenRedistributeEnabled() {
+    p.apply(
+        "MatchingRead",
+        KafkaIO.<String, String>read()
+            .withBootstrapServers("localhost:9092")
+            .withTopic("test_match")
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withRedistribute());
+    p.apply(
+        "NoRedistribute",
+        KafkaIO.<String, String>read()
+            .withBootstrapServers("localhost:9092")
+            .withTopic("test_no_redistribute")
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class));
+    p.apply(
+        "ExplicitlyDisable",
+        KafkaIO.<String, String>read()
+            .withBootstrapServers("localhost:9092")
+            .withTopic("test_disabled")
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withOffsetDeduplication(false));
+    p.apply(
+        "ExplicitlyEnable",
+        KafkaIO.<String, String>read()
+            .withBootstrapServers("localhost:9092")
+            .withTopic("test_enabled")
+            .withKeyDeserializer(StringDeserializer.class)
+            .withValueDeserializer(StringDeserializer.class)
+            .withRedistribute()
+            .withOffsetDeduplication(true));
+
+    p.replaceAll(
+        Collections.singletonList(
+            PTransformOverride.of(
+                KafkaReadWithRedistributeOverride.matcher(),
+                new KafkaReadWithRedistributeOverride.Factory<>())));
+
+    Pipeline.PipelineVisitor visitor =
+        new Pipeline.PipelineVisitor.Defaults() {
+
+          private boolean matchingVisited = false;
+          private boolean noRedistributeVisited = false;
+          private boolean explicitlyDisabledVisited = false;
+          private boolean explicitlyEnabledVisited = false;
+
+          @Override
+          public CompositeBehavior enterCompositeTransform(Node node) {
+            if (node.getTransform() instanceof KafkaIO.Read) {
+              KafkaIO.Read<?, ?> read = (KafkaIO.Read<?, ?>) 
node.getTransform();
+              if (read.getTopics().contains("test_match")) {
+                assertTrue(read.isRedistributed());
+                assertTrue(read.getOffsetDeduplication());
+                assertFalse(matchingVisited);
+                matchingVisited = true;
+              } else if (read.getTopics().contains("test_no_redistribute")) {
+                assertFalse(read.isRedistributed());
+                assertThat(read.getOffsetDeduplication(), nullValue());
+                assertFalse(noRedistributeVisited);
+                noRedistributeVisited = true;
+              } else if (read.getTopics().contains("test_disabled")) {
+                assertFalse(read.isRedistributed());
+                assertFalse(read.getOffsetDeduplication());
+                assertFalse(explicitlyDisabledVisited);
+                explicitlyDisabledVisited = true;
+              } else if (read.getTopics().contains("test_enabled")) {
+                assertTrue(read.isRedistributed());
+                assertTrue(read.getOffsetDeduplication());
+                assertFalse(explicitlyEnabledVisited);
+                explicitlyEnabledVisited = true;
+              }
+            }
+            return CompositeBehavior.ENTER_TRANSFORM;
+          }
+
+          @Override
+          public void leaveCompositeTransform(Node node) {
+            if (node.isRootNode()) {
+              assertTrue("Matching transform was not visited", 
matchingVisited);
+              assertTrue("No redistribute transform was not visited", 
noRedistributeVisited);
+              assertTrue(
+                  "Explicitly disabled transform was not visited", 
explicitlyDisabledVisited);
+              assertTrue("Explicitly enabled transform was not visited", 
explicitlyEnabledVisited);
+            }
+          }
+        };
+    p.traverseTopologically(visitor);
+  }
+}

Reply via email to