This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 2af98303d3 [HUDI-4122] Fix NPE caused by adding kafka nodes (#5632) 2af98303d3 is described below commit 2af98303d3881e5d1da7d2e08f904b18f8b79488 Author: wangxianghu <wangxian...@apache.org> AuthorDate: Sat May 21 07:12:53 2022 +0400 [HUDI-4122] Fix NPE caused by adding kafka nodes (#5632) --- .../utilities/sources/helpers/KafkaOffsetGen.java | 47 ++++++++++++++++++---- 1 file changed, 39 insertions(+), 8 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index 564c5e2058..1abd2616b9 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -22,17 +22,17 @@ import org.apache.hudi.DataSourceUtils; import org.apache.hudi.common.config.ConfigProperty; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; -import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieNotSupportedException; - import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamerMetrics; +import org.apache.hudi.utilities.exception.HoodieDeltaStreamerException; import org.apache.hudi.utilities.sources.AvroKafkaSource; + import org.apache.kafka.clients.consumer.CommitFailedException; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetAndTimestamp; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.TimeoutException; @@ -48,6 +48,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -169,9 +170,14 @@ public class KafkaOffsetGen { .withDocumentation("Kafka topic name."); public static final ConfigProperty<String> KAFKA_CHECKPOINT_TYPE = ConfigProperty - .key("hoodie.deltastreamer.source.kafka.checkpoint.type") - .defaultValue("string") - .withDocumentation("Kafka chepoint type."); + .key("hoodie.deltastreamer.source.kafka.checkpoint.type") + .defaultValue("string") + .withDocumentation("Kafka checkpoint type."); + + public static final ConfigProperty<Long> KAFKA_FETCH_PARTITION_TIME_OUT = ConfigProperty + .key("hoodie.deltastreamer.source.kafka.fetch_partition.time.out") + .defaultValue(300 * 1000L) + .withDocumentation("Time out for fetching partitions. 5min by default"); public static final ConfigProperty<Boolean> ENABLE_KAFKA_COMMIT_OFFSET = ConfigProperty .key("hoodie.deltastreamer.source.kafka.enable.commit.offset") @@ -236,8 +242,7 @@ public class KafkaOffsetGen { if (!checkTopicExists(consumer)) { throw new HoodieException("Kafka topic:" + topicName + " does not exist"); } - List<PartitionInfo> partitionInfoList; - partitionInfoList = consumer.partitionsFor(topicName); + List<PartitionInfo> partitionInfoList = fetchPartitionInfos(consumer, topicName); Set<TopicPartition> topicPartitions = partitionInfoList.stream() .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); @@ -287,6 +292,32 @@ public class KafkaOffsetGen { return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); } + /** + * Fetch partition infos for given topic. + * + * @param consumer + * @param topicName + */ + private List<PartitionInfo> fetchPartitionInfos(KafkaConsumer consumer, String topicName) { + long timeout = this.props.getLong(Config.KAFKA_FETCH_PARTITION_TIME_OUT.key(), Config.KAFKA_FETCH_PARTITION_TIME_OUT.defaultValue()); + long start = System.currentTimeMillis(); + + List<PartitionInfo> partitionInfos; + do { + partitionInfos = consumer.partitionsFor(topicName); + try { + TimeUnit.SECONDS.sleep(10); + } catch (InterruptedException e) { + LOG.error("Sleep failed while fetching partitions"); + } + } while (partitionInfos == null && (System.currentTimeMillis() <= (start + timeout))); + + if (partitionInfos == null) { + throw new HoodieDeltaStreamerException(String.format("Can not find metadata for topic %s from kafka cluster", topicName)); + } + return partitionInfos; + } + /** * Fetch checkpoint offsets for each partition. * @param consumer instance of {@link KafkaConsumer} to fetch offsets from.