This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 37ad3196d81 Fixes a breakage related to Kafka upgrade (#32262)
37ad3196d81 is described below
commit 37ad3196d81c6dc71f8e1e516ed6eda072b98752
Author: Chamikara Jayalath <[email protected]>
AuthorDate: Wed Aug 21 09:42:09 2024 -0700
Fixes a breakage related to Kafka upgrade (#32262)
---
.../sdk/io/kafka/upgrade/KafkaIOTranslation.java | 28 ++++++++++++----------
1 file changed, 15 insertions(+), 13 deletions(-)
diff --git
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
index db7b172170a..841236969d2 100644
---
a/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
+++
b/sdks/java/io/kafka/upgrade/src/main/java/org/apache/beam/sdk/io/kafka/upgrade/KafkaIOTranslation.java
@@ -332,19 +332,21 @@ public class KafkaIOTranslation {
transform = transform.withMaxNumRecords(maxNumRecords);
}
- Boolean isRedistributed = configRow.getBoolean("redistribute");
- if (isRedistributed != null && isRedistributed) {
- transform = transform.withRedistribute();
- Integer redistributeNumKeys =
- configRow.getValue("redistribute_num_keys") == null
- ? Integer.valueOf(0)
- : configRow.getInt32("redistribute_num_keys");
- if (redistributeNumKeys != null && !redistributeNumKeys.equals(0)) {
- transform = transform.withRedistributeNumKeys(redistributeNumKeys);
- }
- Boolean allowDuplicates = configRow.getBoolean("allows_duplicates");
- if (allowDuplicates != null && allowDuplicates) {
- transform = transform.withAllowDuplicates(allowDuplicates);
+ if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion,
"2.58.0") >= 0) {
+ Boolean isRedistributed = configRow.getBoolean("redistribute");
+ if (isRedistributed != null && isRedistributed) {
+ transform = transform.withRedistribute();
+ Integer redistributeNumKeys =
+ configRow.getValue("redistribute_num_keys") == null
+ ? Integer.valueOf(0)
+ : configRow.getInt32("redistribute_num_keys");
+ if (redistributeNumKeys != null && !redistributeNumKeys.equals(0))
{
+ transform =
transform.withRedistributeNumKeys(redistributeNumKeys);
+ }
+ Boolean allowDuplicates =
configRow.getBoolean("allows_duplicates");
+ if (allowDuplicates != null && allowDuplicates) {
+ transform = transform.withAllowDuplicates(allowDuplicates);
+ }
}
}
Duration maxReadTime = configRow.getValue("max_read_time");