This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d43730a65d1 [HUDI-6019] support config minPartitions when reading from 
kafka (#8376)
d43730a65d1 is described below

commit d43730a65d18816bc779b906baf224024e855d72
Author: kongwei <kong...@pku.edu.cn>
AuthorDate: Thu Apr 27 10:17:57 2023 +0800

    [HUDI-6019] support config minPartitions when reading from kafka (#8376)
    
    Co-authored-by: wei.kong <wei.k...@shopee.com>
---
 .../hudi/utilities/config/KafkaSourceConfig.java   |  11 ++
 .../utilities/sources/helpers/KafkaOffsetGen.java  | 100 +++++++++---
 .../sources/helpers/TestCheckpointUtils.java       | 175 ++++++++++++++++++---
 .../sources/helpers/TestKafkaOffsetGen.java        |  71 +++++++++
 4 files changed, 308 insertions(+), 49 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
index 785d254ed8d..2a945434eff 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java
@@ -68,6 +68,17 @@ public class KafkaSourceConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("Maximum number of records obtained in each batch.");
 
+  // the documentation is inspired by the minPartition definition of kafka 
structured streaming
+  public static final ConfigProperty<Long> KAFKA_SOURCE_MIN_PARTITIONS = 
ConfigProperty
+          .key(PREFIX + "minPartitions")
+          .defaultValue(0L)
+          .withDocumentation("Desired minimum number of partitions to read 
from Kafka. "
+              + "By default, Hudi has a 1-1 mapping of topicPartitions to Hudi 
partitions consuming from Kafka. "
+              + "If set this option to a value greater than topicPartitions, "
+              + "Hudi will divvy up large Kafka partitions to smaller pieces. "
+              + "Please note that this configuration is like a hint: the 
number of input tasks will be approximately minPartitions. "
+              + "It can be less or more depending on rounding errors or Kafka 
partitions that didn't receive any new data.");
+
   public static final ConfigProperty<String> KAFKA_TOPIC_NAME = ConfigProperty
       .key(PREFIX + "topic")
       .noDefaultValue()
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index e53cab94538..ca6f08012ad 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -40,6 +40,7 @@ import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -61,6 +62,8 @@ public class KafkaOffsetGen {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetGen.class);
   private static final String METRIC_NAME_KAFKA_DELAY_COUNT = 
"kafkaDelayCount";
+  private static final Comparator<OffsetRange> SORT_BY_PARTITION = 
Comparator.comparing(OffsetRange::partition);
+
   public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp";
 
   /**
@@ -91,6 +94,8 @@ public class KafkaOffsetGen {
      * Format: topic1,0:offset0,1:offset1,2:offset2, .....
      */
     public static String offsetsToStr(OffsetRange[] ranges) {
+      // merge the ranges by partition to maintain one offset range map to one 
topic partition.
+      ranges = mergeRangesByTopicPartition(ranges);
       StringBuilder sb = new StringBuilder();
       // at least 1 partition will be present.
       sb.append(ranges[0].topic() + ",");
@@ -107,48 +112,87 @@ public class KafkaOffsetGen {
      * @param numEvents maximum number of events to read.
      */
     public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> 
fromOffsetMap,
-                                                    Map<TopicPartition, Long> 
toOffsetMap, long numEvents) {
-
-      Comparator<OffsetRange> byPartition = 
Comparator.comparing(OffsetRange::partition);
-
-      // Create initial offset ranges for each 'to' partition, with from = to 
offsets.
+                                                    Map<TopicPartition, Long> 
toOffsetMap,
+                                                    long numEvents,
+                                                    long minPartitions) {
+      // Create initial offset ranges for each 'to' partition, with default 
from = 0 offsets.
       OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> {
         long fromOffset = fromOffsetMap.getOrDefault(tp, 0L);
-        return OffsetRange.create(tp, fromOffset, fromOffset);
+        return OffsetRange.create(tp, fromOffset, toOffsetMap.get(tp));
       })
-          .sorted(byPartition)
+          .sorted(SORT_BY_PARTITION)
           .collect(Collectors.toList())
           .toArray(new OffsetRange[toOffsetMap.size()]);
+      LOG.debug("numEvents {}, minPartitions {}, ranges {}", numEvents, 
minPartitions, ranges);
 
+      boolean needSplitToMinPartitions = minPartitions > toOffsetMap.size();
+      long totalEvents = totalNewMessages(ranges);
       long allocedEvents = 0;
       Set<Integer> exhaustedPartitions = new HashSet<>();
+      List<OffsetRange> finalRanges = new ArrayList<>();
+      // choose the actualNumEvents with min(totalEvents, numEvents)
+      long actualNumEvents = Math.min(totalEvents, numEvents);
+
       // keep going until we have events to allocate and partitions still not 
exhausted.
       while (allocedEvents < numEvents && exhaustedPartitions.size() < 
toOffsetMap.size()) {
-        long remainingEvents = numEvents - allocedEvents;
-        long eventsPerPartition =
-                (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() 
- exhaustedPartitions.size()));
-
         // Allocate the remaining events to non-exhausted partitions, in round 
robin fashion
+        Set<Integer> allocatedPartitionsThisLoop = new 
HashSet<>(exhaustedPartitions);
         for (int i = 0; i < ranges.length; i++) {
+          long remainingEvents = actualNumEvents - allocedEvents;
+          long remainingPartitions = toOffsetMap.size() - 
allocatedPartitionsThisLoop.size();
+          // if need tp split into minPartitions, recalculate the 
remainingPartitions
+          if (needSplitToMinPartitions) {
+            remainingPartitions = minPartitions - finalRanges.size();
+          }
+          long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) / 
remainingPartitions);
+
           OffsetRange range = ranges[i];
-          if (!exhaustedPartitions.contains(range.partition())) {
-            long toOffsetMax = toOffsetMap.get(range.topicPartition());
-            long toOffset = Math.min(toOffsetMax, range.untilOffset() + 
eventsPerPartition);
-            if (toOffset == toOffsetMax) {
-              exhaustedPartitions.add(range.partition());
-            }
-            allocedEvents += toOffset - range.untilOffset();
-            // We need recompute toOffset if allocedEvents larger than 
numEvents.
-            if (allocedEvents > numEvents) {
-              long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - 
allocedEvents));
-              toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd);
-            }
-            ranges[i] = OffsetRange.create(range.topicPartition(), 
range.fromOffset(), toOffset);
+          if (exhaustedPartitions.contains(range.partition())) {
+            continue;
+          }
+
+          long toOffset = Math.min(range.untilOffset(), range.fromOffset() + 
eventsPerPartition);
+          if (toOffset == range.untilOffset()) {
+            exhaustedPartitions.add(range.partition());
           }
+          allocedEvents += toOffset - range.fromOffset();
+          // We need recompute toOffset if allocedEvents larger than 
actualNumEvents.
+          if (allocedEvents > actualNumEvents) {
+            long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents 
- allocedEvents));
+            toOffset = Math.min(range.untilOffset(), toOffset + offsetsToAdd);
+          }
+          OffsetRange thisRange = OffsetRange.create(range.topicPartition(), 
range.fromOffset(), toOffset);
+          finalRanges.add(thisRange);
+          ranges[i] = OffsetRange.create(range.topicPartition(), 
range.fromOffset() + thisRange.count(), range.untilOffset());
+          allocatedPartitionsThisLoop.add(range.partition());
         }
       }
 
-      return ranges;
+      if (!needSplitToMinPartitions) {
+        LOG.debug("final ranges merged by topic partition {}", 
Arrays.toString(mergeRangesByTopicPartition(finalRanges.toArray(new 
OffsetRange[0]))));
+        return mergeRangesByTopicPartition(finalRanges.toArray(new 
OffsetRange[0]));
+      }
+      finalRanges.sort(SORT_BY_PARTITION);
+      LOG.debug("final ranges {}", Arrays.toString(finalRanges.toArray(new 
OffsetRange[0])));
+      return finalRanges.toArray(new OffsetRange[0]);
+    }
+
+    /**
+     * Merge ranges by topic partition, because we need to maintain the 
checkpoint with one offset range per topic partition.
+     * @param oldRanges to merge
+     * @return ranges merged by partition
+     */
+    public static OffsetRange[] mergeRangesByTopicPartition(OffsetRange[] 
oldRanges) {
+      List<OffsetRange> newRanges = new ArrayList<>();
+      Map<TopicPartition, List<OffsetRange>> tpOffsets = 
Arrays.stream(oldRanges).collect(Collectors.groupingBy(OffsetRange::topicPartition));
+      for (Map.Entry<TopicPartition, List<OffsetRange>> entry : 
tpOffsets.entrySet()) {
+        long from = 
entry.getValue().stream().map(OffsetRange::fromOffset).min(Long::compare).get();
+        long until = 
entry.getValue().stream().map(OffsetRange::untilOffset).max(Long::compare).get();
+        newRanges.add(OffsetRange.create(entry.getKey(), from, until));
+      }
+      // make sure the result ranges is order by partition
+      newRanges.sort(SORT_BY_PARTITION);
+      return newRanges.toArray(new OffsetRange[0]);
     }
 
     public static long totalNewMessages(OffsetRange[] ranges) {
@@ -242,7 +286,11 @@ public class KafkaOffsetGen {
       throw new HoodieException("sourceLimit should not be less than the 
number of kafka partitions");
     }
 
-    return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, 
numEvents);
+    long minPartitions = 
props.getLong(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(),
+            KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.defaultValue());
+    LOG.info("getNextOffsetRanges set config " + 
KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions);
+
+    return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, 
numEvents, minPartitions);
   }
 
   /**
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
index e8494fd9526..49e27d0191b 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.utilities.sources.helpers;
 
+import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
+
 import org.apache.kafka.common.TopicPartition;
 import org.apache.spark.streaming.kafka010.OffsetRange;
 import org.junit.jupiter.api.Test;
-import 
org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -39,8 +40,8 @@ public class TestCheckpointUtils {
   @Test
   public void testStringToOffsets() {
     OffsetRange[] ranges =
-            CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, 
new long[]{200000, 250000}),
-                    makeOffsetMap(new int[]{0, 1}, new long[]{300000, 
350000}), 1000000L);
+        CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, 
new long[] {200000, 250000}),
+            makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 
1000000L, 0);
     String checkpointStr = CheckpointUtils.offsetsToStr(ranges);
     Map<TopicPartition, Long> offsetMap = 
CheckpointUtils.strToOffsets(checkpointStr);
     assertEquals(2, offsetMap.size());
@@ -57,27 +58,35 @@ public class TestCheckpointUtils {
   @Test
   public void testOffsetToString() {
     OffsetRange[] ranges =
-            CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, 
new long[]{200000, 250000}),
-                    makeOffsetMap(new int[]{0, 1}, new long[]{300000, 
350000}), 1000000L);
+        CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, 
new long[] {200000, 250000}),
+            makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 
1000000L, 0);
     assertEquals(TEST_TOPIC_NAME + ",0:300000,1:350000", 
CheckpointUtils.offsetsToStr(ranges));
+
+    ranges = new OffsetRange[] {
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 100, 200),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 200, 300)};
+    assertEquals(TEST_TOPIC_NAME + ",0:200,1:300", 
CheckpointUtils.offsetsToStr(ranges));
   }
 
   @Test
-  public void testComputeOffsetRanges() {
+  public void testComputeOffsetRangesWithoutMinPartitions() {
     // test totalNewMessages()
-    long totalMsgs = CheckpointUtils.totalNewMessages(new 
OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
-            OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
+    long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[] 
{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
     assertEquals(200, totalMsgs);
 
     // should consume all the full data
     OffsetRange[] ranges =
-            CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, 
new long[]{200000, 250000}),
-                    makeOffsetMap(new int[]{0, 1}, new long[]{300000, 
350000}), 1000000L);
+        CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, 
new long[] {200000, 250000}),
+            makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 
1000000L, 0);
     assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
 
     // should only consume upto limit
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000);
+    ranges = CheckpointUtils.computeOffsetRanges(
+        makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000, 
0);
     assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(200000, ranges[0].fromOffset());
     assertEquals(205000, ranges[0].untilOffset());
@@ -85,35 +94,155 @@ public class TestCheckpointUtils {
     assertEquals(255000, ranges[1].untilOffset());
 
     // should also consume from new partitions.
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 
100000}), 1000000L);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000, 
100000}), 1000000L, 0);
     assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(3, ranges.length);
 
     // for skewed offsets, does not starve any partition & can catch up
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 
10000}), 100000);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 
10000}), 100000, 0);
     assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(10, ranges[0].count());
     assertEquals(89990, ranges[1].count());
     assertEquals(10000, ranges[2].count());
 
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 
10000}), 1000000);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 
10000}), 1000000, 0);
     assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(10, ranges[0].count());
     assertEquals(100000, ranges[1].count());
     assertEquals(10000, ranges[2].count());
 
     // not all partitions consume same entries.
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1, 
2, 3, 4}, new long[]{0, 0, 0, 0, 0}),
-            makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000, 
1000, 1000, 1000}), 1001);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
+        makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 
1000, 1000}), 1001, 0);
     assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(100, ranges[0].count());
     assertEquals(226, ranges[1].count());
-    assertEquals(226, ranges[2].count());
-    assertEquals(226, ranges[3].count());
-    assertEquals(223, ranges[4].count());
+    assertEquals(225, ranges[2].count());
+    assertEquals(225, ranges[3].count());
+    assertEquals(225, ranges[4].count());
+  }
+
+  @Test
+  public void testComputeOffsetRangesWithMinPartitions() {
+    // default(0) minPartitions
+    OffsetRange[] ranges = 
CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, new long[] 
{0}),
+        makeOffsetMap(new int[] {0}, new long[] {1000}), 300, 0);
+    assertEquals(1, ranges.length);
+    assertEquals(0, ranges[0].fromOffset());
+    assertEquals(300, ranges[0].untilOffset());
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {0, 0}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {1000, 1000}), 300, 0);
+    assertEquals(2, ranges.length);
+    assertEquals(0, ranges[0].fromOffset());
+    assertEquals(150, ranges[0].untilOffset());
+    assertEquals(0, ranges[1].fromOffset());
+    assertEquals(150, ranges[1].untilOffset());
+
+    // N TopicPartitions to N offset ranges
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1, 2}, new long[] {0, 0, 0}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {1000, 1000, 1000}), 
300, 3);
+    assertEquals(3, ranges.length);
+    assertEquals(0, ranges[0].fromOffset());
+    assertEquals(100, ranges[0].untilOffset());
+    assertEquals(0, ranges[1].fromOffset());
+    assertEquals(100, ranges[1].untilOffset());
+    assertEquals(0, ranges[1].fromOffset());
+    assertEquals(100, ranges[1].untilOffset());
+
+    // 1 TopicPartition to N offset ranges
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, 
new long[] {0}),
+        makeOffsetMap(new int[] {0}, new long[] {1000}), 300, 3);
+    assertEquals(3, ranges.length);
+    assertEquals(0, ranges[0].fromOffset());
+    assertEquals(100, ranges[0].untilOffset());
+    assertEquals(100, ranges[1].fromOffset());
+    assertEquals(200, ranges[1].untilOffset());
+    assertEquals(200, ranges[2].fromOffset());
+    assertEquals(300, ranges[2].untilOffset());
+
+    // N skewed TopicPartitions to M offset ranges
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {0, 0}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {100, 500}), 600, 3);
+    assertEquals(3, ranges.length);
+    assertEquals(0, ranges[0].fromOffset());
+    assertEquals(100, ranges[0].untilOffset());
+    assertEquals(0, ranges[1].fromOffset());
+    assertEquals(250, ranges[1].untilOffset());
+    assertEquals(250, ranges[2].fromOffset());
+    assertEquals(500, ranges[2].untilOffset());
+
+    // range inexact multiple of minPartitions
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, 
new long[] {0}),
+        makeOffsetMap(new int[] {0}, new long[] {100}), 600, 3);
+    assertEquals(3, ranges.length);
+    assertEquals(0, ranges[0].fromOffset());
+    assertEquals(34, ranges[0].untilOffset());
+    assertEquals(34, ranges[1].fromOffset());
+    assertEquals(67, ranges[1].untilOffset());
+    assertEquals(67, ranges[2].fromOffset());
+    assertEquals(100, ranges[2].untilOffset());
+
+    // do not ignore empty ranges
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {100, 0}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {100, 600}), 600, 3);
+    assertEquals(3, ranges.length);
+    assertEquals(0, ranges[0].partition());
+    assertEquals(100, ranges[0].fromOffset());
+    assertEquals(100, ranges[0].untilOffset());
+    assertEquals(1, ranges[1].partition());
+    assertEquals(0, ranges[1].fromOffset());
+    assertEquals(300, ranges[1].untilOffset());
+    assertEquals(1, ranges[2].partition());
+    assertEquals(300, ranges[2].fromOffset());
+    assertEquals(600, ranges[2].untilOffset());
+
+    // all empty ranges, do not ignore empty ranges
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {100, 0}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {100, 0}), 600, 3);
+    assertEquals(0, CheckpointUtils.totalNewMessages(ranges));
+    assertEquals(2, ranges.length);
+    assertEquals(0, ranges[0].partition());
+    assertEquals(100, ranges[0].fromOffset());
+    assertEquals(100, ranges[0].untilOffset());
+    assertEquals(1, ranges[1].partition());
+    assertEquals(0, ranges[1].fromOffset());
+    assertEquals(0, ranges[1].untilOffset());
+
+    // minPartitions more than maxEvents
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, 
new long[] {0}),
+        makeOffsetMap(new int[] {0}, new long[] {2}), 600, 3);
+    assertEquals(2, ranges.length);
+    assertEquals(0, ranges[0].fromOffset());
+    assertEquals(1, ranges[0].untilOffset());
+    assertEquals(1, ranges[1].fromOffset());
+    assertEquals(2, ranges[1].untilOffset());
+  }
+
+  @Test
+  public void testSplitAndMergeRanges() {
+    OffsetRange range = OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100);
+    OffsetRange[] ranges = 
CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] 
{0, 0}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {100, 500}), 600, 4);
+    assertEquals(4, ranges.length);
+    OffsetRange[] mergedRanges = 
CheckpointUtils.mergeRangesByTopicPartition(ranges);
+    assertEquals(2, mergedRanges.length);
+    assertEquals(0, mergedRanges[0].partition());
+    assertEquals(0, mergedRanges[0].fromOffset());
+    assertEquals(100, mergedRanges[0].untilOffset());
+    assertEquals(1, mergedRanges[1].partition());
+    assertEquals(0, mergedRanges[1].fromOffset());
+    assertEquals(500, mergedRanges[1].untilOffset());
+
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, 
new long[] {0}),
+        makeOffsetMap(new int[] {0}, new long[] {1000}), 300, 3);
+    assertEquals(3, ranges.length);
+    mergedRanges = CheckpointUtils.mergeRangesByTopicPartition(ranges);
+    assertEquals(1, mergedRanges.length);
+    assertEquals(0, mergedRanges[0].fromOffset());
+    assertEquals(300, mergedRanges[0].untilOffset());
   }
 
   private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, 
long[] offsets) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index d00b1f92b1f..e7b5a0ab5ee 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieNotSupportedException;
+import org.apache.hudi.utilities.config.KafkaSourceConfig;
 import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics;
 import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers;
 
@@ -170,6 +171,76 @@ public class TestKafkaOffsetGen {
     assertEquals(500, nextOffsetRanges[1].untilOffset());
   }
 
+  @Test
+  public void testGetNextOffsetRangesWithMinPartitionsForSinglePartition() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(testTopicName, 1);
+    testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    TypedProperties props = getConsumerConfigs("earliest", "string");
+
+    // default no minPartition set
+    KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
+    OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(300, nextOffsetRanges[0].untilOffset());
+
+    props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 2L);
+    kafkaOffsetGen = new KafkaOffsetGen(props);
+    nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(150, nextOffsetRanges[0].untilOffset());
+    assertEquals(150, nextOffsetRanges[1].fromOffset());
+    assertEquals(300, nextOffsetRanges[1].untilOffset());
+  }
+
+  @Test
+  public void testGetNextOffsetRangesWithMinPartitionsForMultiPartition() {
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.createTopic(testTopicName, 2);
+    testUtils.sendMessages(testTopicName, 
Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000)));
+    TypedProperties props = getConsumerConfigs("earliest", "string");
+
+    // default no minPartition or minPartition less than TopicPartitions
+    KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props);
+    OffsetRange[] nextOffsetRanges = 
kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics);
+    assertEquals(2, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].partition());
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(150, nextOffsetRanges[0].untilOffset());
+    assertEquals(1, nextOffsetRanges[1].partition());
+    assertEquals(0, nextOffsetRanges[1].fromOffset());
+    assertEquals(150, nextOffsetRanges[1].untilOffset());
+
+    props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 1L);
+    kafkaOffsetGen = new KafkaOffsetGen(props);
+    nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
+    assertEquals(2, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].partition());
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(150, nextOffsetRanges[0].untilOffset());
+    assertEquals(1, nextOffsetRanges[1].partition());
+    assertEquals(0, nextOffsetRanges[1].fromOffset());
+    assertEquals(150, nextOffsetRanges[1].untilOffset());
+
+    // minPartition more than TopicPartitions
+    props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 4L);
+    kafkaOffsetGen = new KafkaOffsetGen(props);
+    nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, 
metrics);
+    assertEquals(4, nextOffsetRanges.length);
+    assertEquals(0, nextOffsetRanges[0].partition());
+    assertEquals(0, nextOffsetRanges[0].fromOffset());
+    assertEquals(75, nextOffsetRanges[0].untilOffset());
+    assertEquals(0, nextOffsetRanges[1].partition());
+    assertEquals(75, nextOffsetRanges[1].fromOffset());
+    assertEquals(150, nextOffsetRanges[1].untilOffset());
+    assertEquals(1, nextOffsetRanges[2].partition());
+    assertEquals(0, nextOffsetRanges[2].fromOffset());
+    assertEquals(75, nextOffsetRanges[2].untilOffset());
+    assertEquals(1, nextOffsetRanges[3].partition());
+    assertEquals(75, nextOffsetRanges[3].fromOffset());
+    assertEquals(150, nextOffsetRanges[3].untilOffset());
+  }
+
   @Test
   public void testCheckTopicExists() {
     TypedProperties props = getConsumerConfigs("latest", "string");

Reply via email to