This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 37ad3196d81 Fixes a breakage related to Kafka upgrade (#32262)
37ad3196d81 is described below

commit 37ad3196d81c6dc71f8e1e516ed6eda072b98752
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Wed Aug 21 09:42:09 2024 -0700

    Fixes a breakage related to Kafka upgrade (#32262)
---
 .../sdk/io/kafka/upgrade/KafkaIOTranslation.java   | 28 ++++++++++++----------
 1 file changed, 15 insertions(+), 13 deletions(-)

diff --git 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index db7b172170a..841236969d2 100644
--- 
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++ 
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -332,19 +332,21 @@ public class KafkaIOTranslation {
           transform = transform.withMaxNumRecords(maxNumRecords);
         }
 
-        Boolean isRedistributed = configRow.getBoolean("redistribute");
-        if (isRedistributed != null && isRedistributed) {
-          transform = transform.withRedistribute();
-          Integer redistributeNumKeys =
-              configRow.getValue("redistribute_num_keys") == null
-                  ? Integer.valueOf(0)
-                  : configRow.getInt32("redistribute_num_keys");
-          if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) {
-            transform = transform.withRedistributeNumKeys(redistributeNumKeys);
-          }
-          Boolean allowDuplicates = configRow.getBoolean("allows_duplicates");
-          if (allowDuplicates != null && allowDuplicates) {
-            transform = transform.withAllowDuplicates(allowDuplicates);
+        if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, 
"2.58.0") >= 0) {
+          Boolean isRedistributed = configRow.getBoolean("redistribute");
+          if (isRedistributed != null && isRedistributed) {
+            transform = transform.withRedistribute();
+            Integer redistributeNumKeys =
+                configRow.getValue("redistribute_num_keys") == null
+                    ? Integer.valueOf(0)
+                    : configRow.getInt32("redistribute_num_keys");
+            if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) 
{
+              transform = 
transform.withRedistributeNumKeys(redistributeNumKeys);
+            }
+            Boolean allowDuplicates = 
configRow.getBoolean("allows_duplicates");
+            if (allowDuplicates != null && allowDuplicates) {
+              transform = transform.withAllowDuplicates(allowDuplicates);
+            }
           }
         }
         Duration maxReadTime = configRow.getValue("max_read_time");

Reply via email to