This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 347e297  [HUDI-596] Close KafkaConsumer every time (#1303)
347e297 is described below

commit 347e297ac19ed55172e84e13075e19ce060954c6
Author: dengziming <dengziming1...@gmail.com>
AuthorDate: Tue Feb 4 15:42:21 2020 +0800

    [HUDI-596] Close KafkaConsumer every time (#1303)
---
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 50 +++++++++++-----------
 1 file changed, 26 insertions(+), 24 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index ed5e4e9..a92a441 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -172,33 +172,35 @@ public class KafkaOffsetGen {
   public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr, 
long sourceLimit) {
 
     // Obtain current metadata for the topic
-    KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
-    List<PartitionInfo> partitionInfoList;
-    partitionInfoList = consumer.partitionsFor(topicName);
-    Set<TopicPartition> topicPartitions = partitionInfoList.stream()
-            .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
-
-    // Determine the offset ranges to read from
     Map<TopicPartition, Long> fromOffsets;
-    if (lastCheckpointStr.isPresent()) {
-      fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, 
topicPartitions);
-    } else {
-      KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
-              .valueOf(props.getString("auto.offset.reset", 
Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
-      switch (autoResetValue) {
-        case EARLIEST:
-          fromOffsets = consumer.beginningOffsets(topicPartitions);
-          break;
-        case LATEST:
-          fromOffsets = consumer.endOffsets(topicPartitions);
-          break;
-        default:
-          throw new HoodieNotSupportedException("Auto reset value must be one 
of 'smallest' or 'largest' ");
+    Map<TopicPartition, Long> toOffsets;
+    try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+      List<PartitionInfo> partitionInfoList;
+      partitionInfoList = consumer.partitionsFor(topicName);
+      Set<TopicPartition> topicPartitions = partitionInfoList.stream()
+              .map(x -> new TopicPartition(x.topic(), 
x.partition())).collect(Collectors.toSet());
+
+      // Determine the offset ranges to read from
+      if (lastCheckpointStr.isPresent()) {
+        fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr, 
topicPartitions);
+      } else {
+        KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
+                .valueOf(props.getString("auto.offset.reset", 
Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
+        switch (autoResetValue) {
+          case EARLIEST:
+            fromOffsets = consumer.beginningOffsets(topicPartitions);
+            break;
+          case LATEST:
+            fromOffsets = consumer.endOffsets(topicPartitions);
+            break;
+          default:
+            throw new HoodieNotSupportedException("Auto reset value must be 
one of 'smallest' or 'largest' ");
+        }
       }
-    }
 
-    // Obtain the latest offsets.
-    Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions);
+      // Obtain the latest offsets.
+      toOffsets = consumer.endOffsets(topicPartitions);
+    }
 
     // Come up with final set of OffsetRanges to read (account for new 
partitions, limit number of events)
     long maxEventsToReadFromKafka = 
props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,

Reply via email to