This is an automated email from the ASF dual-hosted git repository. vinoth pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push: new 347e297 [HUDI-596] Close KafkaConsumer every time (#1303) 347e297 is described below commit 347e297ac19ed55172e84e13075e19ce060954c6 Author: dengziming <dengziming1...@gmail.com> AuthorDate: Tue Feb 4 15:42:21 2020 +0800 [HUDI-596] Close KafkaConsumer every time (#1303) --- .../utilities/sources/helpers/KafkaOffsetGen.java | 50 +++++++++++----------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index ed5e4e9..a92a441 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -172,33 +172,35 @@ public class KafkaOffsetGen { public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, long sourceLimit) { // Obtain current metadata for the topic - KafkaConsumer consumer = new KafkaConsumer(kafkaParams); - List<PartitionInfo> partitionInfoList; - partitionInfoList = consumer.partitionsFor(topicName); - Set<TopicPartition> topicPartitions = partitionInfoList.stream() - .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); - - // Determine the offset ranges to read from Map<TopicPartition, Long> fromOffsets; - if (lastCheckpointStr.isPresent()) { - fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); - } else { - KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies - .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); - switch (autoResetValue) { - case EARLIEST: - fromOffsets = consumer.beginningOffsets(topicPartitions); - break; - case LATEST: - fromOffsets = consumer.endOffsets(topicPartitions); - break; - default: - throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' "); + Map<TopicPartition, Long> toOffsets; + try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) { + List<PartitionInfo> partitionInfoList; + partitionInfoList = consumer.partitionsFor(topicName); + Set<TopicPartition> topicPartitions = partitionInfoList.stream() + .map(x -> new TopicPartition(x.topic(), x.partition())).collect(Collectors.toSet()); + + // Determine the offset ranges to read from + if (lastCheckpointStr.isPresent()) { + fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, topicPartitions); + } else { + KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies + .valueOf(props.getString("auto.offset.reset", Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase()); + switch (autoResetValue) { + case EARLIEST: + fromOffsets = consumer.beginningOffsets(topicPartitions); + break; + case LATEST: + fromOffsets = consumer.endOffsets(topicPartitions); + break; + default: + throw new HoodieNotSupportedException("Auto reset value must be one of 'smallest' or 'largest' "); + } } - } - // Obtain the latest offsets. - Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions); + // Obtain the latest offsets. + toOffsets = consumer.endOffsets(topicPartitions); + } // Come up with final set of OffsetRanges to read (account for new partitions, limit number of events) long maxEventsToReadFromKafka = props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,