This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2af98303d3 [HUDI-4122] Fix NPE caused by adding kafka nodes (#5632)
2af98303d3 is described below

commit 2af98303d3881e5d1da7d2e08f904b18f8b79488
Author: wangxianghu <wangxian...@apache.org>
AuthorDate: Sat May 21 07:12:53 2022 +0400

    [HUDI-4122] Fix NPE caused by adding kafka nodes (#5632)
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 47 ++++++++++++++++++----
 1 file changed, 39 insertions(+), 8 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index 564c5e2058..1abd2616b9 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -22,17 +22,17 @@ import org.apache.hudi.DataSourceUtils;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
-
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics;
+import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException;
 import org.apache.hudi.utilities.sources.AvroKafkaSource;
+
 import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -48,6 +48,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -169,9 +170,14 @@ public class KafkaOffsetGen {
             .withDocumentation("Kafka topic name.");
 
     public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = 
ConfigProperty
-            .key("hoodie.deltastreamer.source.kafka.checkpoint.type")
-            .defaultValue("string")
-            .withDocumentation("Kafka chepoint type.");
+        .key("hoodie.deltastreamer.source.kafka.checkpoint.type")
+        .defaultValue("string")
+        .withDocumentation("Kafka checkpoint type.");
+
+    public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT = 
ConfigProperty
+        .key("hoodie.deltastreamer.source.kafka.fetch_partition.time.out")
+        .defaultValue(300 * 1000L)
+        .withDocumentation("Time out for fetching partitions. 5min by 
default");
 
     public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = 
ConfigProperty
             .key("hoodie.deltastreamer.source.kafka.enable.commit.offset")
@@ -236,8 +242,7 @@ public class KafkaOffsetGen {
       if (!checkTopicExists(consumer)) {
         throw new HoodieException("Kafka topic:" + topicName + " does not 
exist");
       }
-      List<PartitionInfo> partitionInfoList;
-      partitionInfoList = consumer.partitionsFor(topicName);
+      List<PartitionInfo> partitionInfoList = fetchPartitionInfos(consumer, 
topicName);
       Set<TopicPartition> topicPartitions = partitionInfoList.stream()
               .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
 
@@ -287,6 +292,32 @@ public class KafkaOffsetGen {
     return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, 
numEvents);
   }
 
+  /**
+   * Fetch partition infos for given topic.
+   *
+   * @param consumer
+   * @param topicName
+   */
+  private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, 
String topicName) {
+    long timeout = 
this.props.getLong(Config.KAFKA_FETCH_PARTITION_TIME_OUT.key(), 
Config.KAFKA_FETCH_PARTITION_TIME_OUT.defaultValue());
+    long start = System.currentTimeMillis();
+
+    List<PartitionInfo> partitionInfos;
+    do {
+      partitionInfos = consumer.partitionsFor(topicName);
+      try {
+        TimeUnit.SECONDS.sleep(10);
+      } catch (InterruptedException e) {
+        LOG.error("Sleep failed while fetching partitions");
+      }
+    } while (partitionInfos == null && (System.currentTimeMillis() <= (start + 
timeout)));
+
+    if (partitionInfos == null) {
+      throw new HoodieDeltaStreamerException(String.format("Can not find 
metadata for topic %s from kafka cluster", topicName));
+    }
+    return partitionInfos;
+  }
+
   /**
    * Fetch checkpoint offsets for each partition.
    * @param consumer instance of {@link KafkaConsumer} to fetch offsets from.

Reply via email to