This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 60436605e19 Add schema provider support for Kafka redistribute options 
(#36332)
60436605e19 is described below

commit 60436605e1983d9bc34f5be4aec881124e22b13d
Author: Tom Stepp <[email protected]>
AuthorDate: Sun Oct 5 13:51:12 2025 -0700

    Add schema provider support for Kafka redistribute options (#36332)
    
    * Add deterministic sharding unit test.
    
    * Refactor to specific deterministic Kafka redistribute method.
    
    * Add redistribute by key variant.
    
    * Actually enable withRedistributeByRecordKey in KafkaIOTest.
    
    * Add byRecordKey property to Kafka read compatibility.
    
    * Rebase and revert method rename for debugging.
    
    * Add schema provider for redistribute options
    
    * Address spotless findings to simplify boolean expressions
    
    * Revert accidental changes from merge conflict resolution
    
    * Refactor into helper method.
---
 .../KafkaReadSchemaTransformConfiguration.java     | 30 ++++++++++++++++++++++
 .../io/kafka/KafkaReadSchemaTransformProvider.java | 29 +++++++++++++++++++++
 .../KafkaReadSchemaTransformProviderTest.java      |  7 ++++-
 3 files changed, 65 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
index 47e0b2a9aca..2ac8370099f 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java
@@ -160,6 +160,26 @@ public abstract class 
KafkaReadSchemaTransformConfiguration {
   @Nullable
   public abstract ErrorHandling getErrorHandling();
 
+  @SchemaFieldDescription("If the Kafka read should be redistributed.")
+  @Nullable
+  public abstract Boolean getRedistributed();
+
+  @SchemaFieldDescription("If the Kafka read allows duplicates.")
+  @Nullable
+  public abstract Boolean getAllowDuplicates();
+
+  @SchemaFieldDescription("The number of keys for redistributing Kafka 
inputs.")
+  @Nullable
+  public abstract Integer getRedistributeNumKeys();
+
+  @SchemaFieldDescription("If the redistribute is using offset deduplication 
mode.")
+  @Nullable
+  public abstract Boolean getOffsetDeduplication();
+
+  @SchemaFieldDescription("If the redistribute keys by the Kafka record key.")
+  @Nullable
+  public abstract Boolean getRedistributeByRecordKey();
+
   /** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */
   @AutoValue.Builder
   public abstract static class Builder {
@@ -190,6 +210,16 @@ public abstract class 
KafkaReadSchemaTransformConfiguration {
 
     public abstract Builder setErrorHandling(ErrorHandling errorHandling);
 
+    public abstract Builder setRedistributed(Boolean redistribute);
+
+    public abstract Builder setAllowDuplicates(Boolean allowDuplicates);
+
+    public abstract Builder setRedistributeNumKeys(Integer 
redistributeNumKeys);
+
+    public abstract Builder setOffsetDeduplication(Boolean 
offsetDeduplication);
+
+    public abstract Builder setRedistributeByRecordKey(Boolean 
redistributeByRecordKey);
+
     /** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
     public abstract KafkaReadSchemaTransformConfiguration build();
   }
diff --git 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
index 57fac43640a..74f9b147bbd 100644
--- 
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
+++ 
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java
@@ -166,6 +166,31 @@ public class KafkaReadSchemaTransformProvider
       return SchemaRegistryProvider.UNSPECIFIED;
     }
 
+    private static <K, V> KafkaIO.Read<K, V> applyRedistributeSettings(
+        KafkaIO.Read<K, V> kafkaRead, KafkaReadSchemaTransformConfiguration 
configuration) {
+      Boolean redistribute = configuration.getRedistributed();
+      if (redistribute != null && redistribute) {
+        kafkaRead = kafkaRead.withRedistribute();
+      }
+      Integer redistributeNumKeys = configuration.getRedistributeNumKeys();
+      if (redistributeNumKeys != null && redistributeNumKeys > 0) {
+        kafkaRead = kafkaRead.withRedistributeNumKeys(redistributeNumKeys);
+      }
+      Boolean allowDuplicates = configuration.getAllowDuplicates();
+      if (allowDuplicates != null) {
+        kafkaRead = kafkaRead.withAllowDuplicates(allowDuplicates);
+      }
+      Boolean redistributeByRecordKey = 
configuration.getRedistributeByRecordKey();
+      if (redistributeByRecordKey != null) {
+        kafkaRead = 
kafkaRead.withRedistributeByRecordKey(redistributeByRecordKey);
+      }
+      Boolean offsetDeduplication = configuration.getOffsetDeduplication();
+      if (offsetDeduplication != null) {
+        kafkaRead = kafkaRead.withOffsetDeduplication(offsetDeduplication);
+      }
+      return kafkaRead;
+    }
+
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
       configuration.validate();
@@ -233,6 +258,8 @@ public class KafkaReadSchemaTransformProvider
           kafkaRead = 
kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds));
         }
 
+        kafkaRead = applyRedistributeSettings(kafkaRead, configuration);
+
         PCollection<GenericRecord> kafkaValues =
             
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
 
@@ -283,6 +310,8 @@ public class KafkaReadSchemaTransformProvider
         kafkaRead = 
kafkaRead.withMaxReadTime(Duration.standardSeconds(maxReadTimeSeconds));
       }
 
+      kafkaRead = applyRedistributeSettings(kafkaRead, configuration);
+
       PCollection<byte[]> kafkaValues =
           
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
 
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
index dc97dadf6e9..3c19f85c300 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProviderTest.java
@@ -130,7 +130,12 @@ public class KafkaReadSchemaTransformProviderTest {
             "error_handling",
             "file_descriptor_path",
             "message_name",
-            "max_read_time_seconds"),
+            "max_read_time_seconds",
+            "redistributed",
+            "allow_duplicates",
+            "offset_deduplication",
+            "redistribute_num_keys",
+            "redistribute_by_record_key"),
         kafkaProvider.configurationSchema().getFields().stream()
             .map(field -> field.getName())
             .collect(Collectors.toSet()));

Reply via email to