This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new d43730a65d1 [HUDI-6019] support config minPartitions when reading from kafka (#8376) d43730a65d1 is described below commit d43730a65d18816bc779b906baf224024e855d72 Author: kongwei <kong...@pku.edu.cn> AuthorDate: Thu Apr 27 10:17:57 2023 +0800 [HUDI-6019] support config minPartitions when reading from kafka (#8376) Co-authored-by: wei.kong <wei.k...@shopee.com> --- .../hudi/utilities/config/KafkaSourceConfig.java | 11 ++ .../utilities/sources/helpers/KafkaOffsetGen.java | 100 +++++++++--- .../sources/helpers/TestCheckpointUtils.java | 175 ++++++++++++++++++--- .../sources/helpers/TestKafkaOffsetGen.java | 71 +++++++++ 4 files changed, 308 insertions(+), 49 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java index 785d254ed8d..2a945434eff 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/config/KafkaSourceConfig.java @@ -68,6 +68,17 @@ public class KafkaSourceConfig extends HoodieConfig { .markAdvanced() .withDocumentation("Maximum number of records obtained in each batch."); + // the documentation is inspired by the minPartition definition of kafka structured streaming + public static final ConfigProperty<Long> KAFKA_SOURCE_MIN_PARTITIONS = ConfigProperty + .key(PREFIX + "minPartitions") + .defaultValue(0L) + .withDocumentation("Desired minimum number of partitions to read from Kafka. " + + "By default, Hudi has a 1-1 mapping of topicPartitions to Hudi partitions consuming from Kafka. " + + "If set this option to a value greater than topicPartitions, " + + "Hudi will divvy up large Kafka partitions to smaller pieces. " + + "Please note that this configuration is like a hint: the number of input tasks will be approximately minPartitions. " + + "It can be less or more depending on rounding errors or Kafka partitions that didn't receive any new data."); + public static final ConfigProperty<String> KAFKA_TOPIC_NAME = ConfigProperty .key(PREFIX + "topic") .noDefaultValue() diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java index e53cab94538..ca6f08012ad 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java @@ -40,6 +40,7 @@ import org.apache.spark.streaming.kafka010.OffsetRange; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; @@ -61,6 +62,8 @@ public class KafkaOffsetGen { private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetGen.class); private static final String METRIC_NAME_KAFKA_DELAY_COUNT = "kafkaDelayCount"; + private static final Comparator<OffsetRange> SORT_BY_PARTITION = Comparator.comparing(OffsetRange::partition); + public static final String KAFKA_CHECKPOINT_TYPE_TIMESTAMP = "timestamp"; /** @@ -91,6 +94,8 @@ public class KafkaOffsetGen { * Format: topic1,0:offset0,1:offset1,2:offset2, ..... */ public static String offsetsToStr(OffsetRange[] ranges) { + // merge the ranges by partition to maintain one offset range map to one topic partition. + ranges = mergeRangesByTopicPartition(ranges); StringBuilder sb = new StringBuilder(); // at least 1 partition will be present. sb.append(ranges[0].topic() + ","); @@ -107,48 +112,87 @@ public class KafkaOffsetGen { * @param numEvents maximum number of events to read. */ public static OffsetRange[] computeOffsetRanges(Map<TopicPartition, Long> fromOffsetMap, - Map<TopicPartition, Long> toOffsetMap, long numEvents) { - - Comparator<OffsetRange> byPartition = Comparator.comparing(OffsetRange::partition); - - // Create initial offset ranges for each 'to' partition, with from = to offsets. + Map<TopicPartition, Long> toOffsetMap, + long numEvents, + long minPartitions) { + // Create initial offset ranges for each 'to' partition, with default from = 0 offsets. OffsetRange[] ranges = toOffsetMap.keySet().stream().map(tp -> { long fromOffset = fromOffsetMap.getOrDefault(tp, 0L); - return OffsetRange.create(tp, fromOffset, fromOffset); + return OffsetRange.create(tp, fromOffset, toOffsetMap.get(tp)); }) - .sorted(byPartition) + .sorted(SORT_BY_PARTITION) .collect(Collectors.toList()) .toArray(new OffsetRange[toOffsetMap.size()]); + LOG.debug("numEvents {}, minPartitions {}, ranges {}", numEvents, minPartitions, ranges); + boolean needSplitToMinPartitions = minPartitions > toOffsetMap.size(); + long totalEvents = totalNewMessages(ranges); long allocedEvents = 0; Set<Integer> exhaustedPartitions = new HashSet<>(); + List<OffsetRange> finalRanges = new ArrayList<>(); + // choose the actualNumEvents with min(totalEvents, numEvents) + long actualNumEvents = Math.min(totalEvents, numEvents); + // keep going until we have events to allocate and partitions still not exhausted. while (allocedEvents < numEvents && exhaustedPartitions.size() < toOffsetMap.size()) { - long remainingEvents = numEvents - allocedEvents; - long eventsPerPartition = - (long) Math.ceil((1.0 * remainingEvents) / (toOffsetMap.size() - exhaustedPartitions.size())); - // Allocate the remaining events to non-exhausted partitions, in round robin fashion + Set<Integer> allocatedPartitionsThisLoop = new HashSet<>(exhaustedPartitions); for (int i = 0; i < ranges.length; i++) { + long remainingEvents = actualNumEvents - allocedEvents; + long remainingPartitions = toOffsetMap.size() - allocatedPartitionsThisLoop.size(); + // if need tp split into minPartitions, recalculate the remainingPartitions + if (needSplitToMinPartitions) { + remainingPartitions = minPartitions - finalRanges.size(); + } + long eventsPerPartition = (long) Math.ceil((1.0 * remainingEvents) / remainingPartitions); + OffsetRange range = ranges[i]; - if (!exhaustedPartitions.contains(range.partition())) { - long toOffsetMax = toOffsetMap.get(range.topicPartition()); - long toOffset = Math.min(toOffsetMax, range.untilOffset() + eventsPerPartition); - if (toOffset == toOffsetMax) { - exhaustedPartitions.add(range.partition()); - } - allocedEvents += toOffset - range.untilOffset(); - // We need recompute toOffset if allocedEvents larger than numEvents. - if (allocedEvents > numEvents) { - long offsetsToAdd = Math.min(eventsPerPartition, (numEvents - allocedEvents)); - toOffset = Math.min(toOffsetMax, toOffset + offsetsToAdd); - } - ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset); + if (exhaustedPartitions.contains(range.partition())) { + continue; + } + + long toOffset = Math.min(range.untilOffset(), range.fromOffset() + eventsPerPartition); + if (toOffset == range.untilOffset()) { + exhaustedPartitions.add(range.partition()); } + allocedEvents += toOffset - range.fromOffset(); + // We need recompute toOffset if allocedEvents larger than actualNumEvents. + if (allocedEvents > actualNumEvents) { + long offsetsToAdd = Math.min(eventsPerPartition, (actualNumEvents - allocedEvents)); + toOffset = Math.min(range.untilOffset(), toOffset + offsetsToAdd); + } + OffsetRange thisRange = OffsetRange.create(range.topicPartition(), range.fromOffset(), toOffset); + finalRanges.add(thisRange); + ranges[i] = OffsetRange.create(range.topicPartition(), range.fromOffset() + thisRange.count(), range.untilOffset()); + allocatedPartitionsThisLoop.add(range.partition()); } } - return ranges; + if (!needSplitToMinPartitions) { + LOG.debug("final ranges merged by topic partition {}", Arrays.toString(mergeRangesByTopicPartition(finalRanges.toArray(new OffsetRange[0])))); + return mergeRangesByTopicPartition(finalRanges.toArray(new OffsetRange[0])); + } + finalRanges.sort(SORT_BY_PARTITION); + LOG.debug("final ranges {}", Arrays.toString(finalRanges.toArray(new OffsetRange[0]))); + return finalRanges.toArray(new OffsetRange[0]); + } + + /** + * Merge ranges by topic partition, because we need to maintain the checkpoint with one offset range per topic partition. + * @param oldRanges to merge + * @return ranges merged by partition + */ + public static OffsetRange[] mergeRangesByTopicPartition(OffsetRange[] oldRanges) { + List<OffsetRange> newRanges = new ArrayList<>(); + Map<TopicPartition, List<OffsetRange>> tpOffsets = Arrays.stream(oldRanges).collect(Collectors.groupingBy(OffsetRange::topicPartition)); + for (Map.Entry<TopicPartition, List<OffsetRange>> entry : tpOffsets.entrySet()) { + long from = entry.getValue().stream().map(OffsetRange::fromOffset).min(Long::compare).get(); + long until = entry.getValue().stream().map(OffsetRange::untilOffset).max(Long::compare).get(); + newRanges.add(OffsetRange.create(entry.getKey(), from, until)); + } + // make sure the result ranges is order by partition + newRanges.sort(SORT_BY_PARTITION); + return newRanges.toArray(new OffsetRange[0]); } public static long totalNewMessages(OffsetRange[] ranges) { @@ -242,7 +286,11 @@ public class KafkaOffsetGen { throw new HoodieException("sourceLimit should not be less than the number of kafka partitions"); } - return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents); + long minPartitions = props.getLong(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.defaultValue()); + LOG.info("getNextOffsetRanges set config " + KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key() + " to " + minPartitions); + + return CheckpointUtils.computeOffsetRanges(fromOffsets, toOffsets, numEvents, minPartitions); } /** diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java index e8494fd9526..49e27d0191b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java @@ -18,10 +18,11 @@ package org.apache.hudi.utilities.sources.helpers; +import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; + import org.apache.kafka.common.TopicPartition; import org.apache.spark.streaming.kafka010.OffsetRange; import org.junit.jupiter.api.Test; -import org.apache.hudi.utilities.sources.helpers.KafkaOffsetGen.CheckpointUtils; import java.util.HashMap; import java.util.HashSet; @@ -39,8 +40,8 @@ public class TestCheckpointUtils { @Test public void testStringToOffsets() { OffsetRange[] ranges = - CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 1000000L); + CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 1000000L, 0); String checkpointStr = CheckpointUtils.offsetsToStr(ranges); Map<TopicPartition, Long> offsetMap = CheckpointUtils.strToOffsets(checkpointStr); assertEquals(2, offsetMap.size()); @@ -57,27 +58,35 @@ public class TestCheckpointUtils { @Test public void testOffsetToString() { OffsetRange[] ranges = - CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 1000000L); + CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 1000000L, 0); assertEquals(TEST_TOPIC_NAME + ",0:300000,1:350000", CheckpointUtils.offsetsToStr(ranges)); + + ranges = new OffsetRange[] { + OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200), + OffsetRange.apply(TEST_TOPIC_NAME, 1, 100, 200), + OffsetRange.apply(TEST_TOPIC_NAME, 1, 200, 300)}; + assertEquals(TEST_TOPIC_NAME + ",0:200,1:300", CheckpointUtils.offsetsToStr(ranges)); } @Test - public void testComputeOffsetRanges() { + public void testComputeOffsetRangesWithoutMinPartitions() { // test totalNewMessages() - long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100), - OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)}); + long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[] {OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)}); assertEquals(200, totalMsgs); // should consume all the full data OffsetRange[] ranges = - CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 1000000L); + CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 1000000L, 0); assertEquals(200000, CheckpointUtils.totalNewMessages(ranges)); // should only consume upto limit - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000); + ranges = CheckpointUtils.computeOffsetRanges( + makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000, 0); assertEquals(10000, CheckpointUtils.totalNewMessages(ranges)); assertEquals(200000, ranges[0].fromOffset()); assertEquals(205000, ranges[0].untilOffset()); @@ -85,35 +94,155 @@ public class TestCheckpointUtils { assertEquals(255000, ranges[1].untilOffset()); // should also consume from new partitions. - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}), 1000000L); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000, 100000}), 1000000L, 0); assertEquals(300000, CheckpointUtils.totalNewMessages(ranges)); assertEquals(3, ranges.length); // for skewed offsets, does not starve any partition & can catch up - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 100000); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 100000, 0); assertEquals(100000, CheckpointUtils.totalNewMessages(ranges)); assertEquals(10, ranges[0].count()); assertEquals(89990, ranges[1].count()); assertEquals(10000, ranges[2].count()); - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 1000000); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 1000000, 0); assertEquals(110010, CheckpointUtils.totalNewMessages(ranges)); assertEquals(10, ranges[0].count()); assertEquals(100000, ranges[1].count()); assertEquals(10000, ranges[2].count()); // not all partitions consume same entries. - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{0, 0, 0, 0, 0}), - makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000, 1000, 1000, 1000}), 1001); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}), + makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001, 0); assertEquals(1001, CheckpointUtils.totalNewMessages(ranges)); assertEquals(100, ranges[0].count()); assertEquals(226, ranges[1].count()); - assertEquals(226, ranges[2].count()); - assertEquals(226, ranges[3].count()); - assertEquals(223, ranges[4].count()); + assertEquals(225, ranges[2].count()); + assertEquals(225, ranges[3].count()); + assertEquals(225, ranges[4].count()); + } + + @Test + public void testComputeOffsetRangesWithMinPartitions() { + // default(0) minPartitions + OffsetRange[] ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, new long[] {0}), + makeOffsetMap(new int[] {0}, new long[] {1000}), 300, 0); + assertEquals(1, ranges.length); + assertEquals(0, ranges[0].fromOffset()); + assertEquals(300, ranges[0].untilOffset()); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {0, 0}), + makeOffsetMap(new int[] {0, 1}, new long[] {1000, 1000}), 300, 0); + assertEquals(2, ranges.length); + assertEquals(0, ranges[0].fromOffset()); + assertEquals(150, ranges[0].untilOffset()); + assertEquals(0, ranges[1].fromOffset()); + assertEquals(150, ranges[1].untilOffset()); + + // N TopicPartitions to N offset ranges + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2}, new long[] {0, 0, 0}), + makeOffsetMap(new int[] {0, 1, 2}, new long[] {1000, 1000, 1000}), 300, 3); + assertEquals(3, ranges.length); + assertEquals(0, ranges[0].fromOffset()); + assertEquals(100, ranges[0].untilOffset()); + assertEquals(0, ranges[1].fromOffset()); + assertEquals(100, ranges[1].untilOffset()); + assertEquals(0, ranges[1].fromOffset()); + assertEquals(100, ranges[1].untilOffset()); + + // 1 TopicPartition to N offset ranges + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, new long[] {0}), + makeOffsetMap(new int[] {0}, new long[] {1000}), 300, 3); + assertEquals(3, ranges.length); + assertEquals(0, ranges[0].fromOffset()); + assertEquals(100, ranges[0].untilOffset()); + assertEquals(100, ranges[1].fromOffset()); + assertEquals(200, ranges[1].untilOffset()); + assertEquals(200, ranges[2].fromOffset()); + assertEquals(300, ranges[2].untilOffset()); + + // N skewed TopicPartitions to M offset ranges + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {0, 0}), + makeOffsetMap(new int[] {0, 1}, new long[] {100, 500}), 600, 3); + assertEquals(3, ranges.length); + assertEquals(0, ranges[0].fromOffset()); + assertEquals(100, ranges[0].untilOffset()); + assertEquals(0, ranges[1].fromOffset()); + assertEquals(250, ranges[1].untilOffset()); + assertEquals(250, ranges[2].fromOffset()); + assertEquals(500, ranges[2].untilOffset()); + + // range inexact multiple of minPartitions + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, new long[] {0}), + makeOffsetMap(new int[] {0}, new long[] {100}), 600, 3); + assertEquals(3, ranges.length); + assertEquals(0, ranges[0].fromOffset()); + assertEquals(34, ranges[0].untilOffset()); + assertEquals(34, ranges[1].fromOffset()); + assertEquals(67, ranges[1].untilOffset()); + assertEquals(67, ranges[2].fromOffset()); + assertEquals(100, ranges[2].untilOffset()); + + // do not ignore empty ranges + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {100, 0}), + makeOffsetMap(new int[] {0, 1}, new long[] {100, 600}), 600, 3); + assertEquals(3, ranges.length); + assertEquals(0, ranges[0].partition()); + assertEquals(100, ranges[0].fromOffset()); + assertEquals(100, ranges[0].untilOffset()); + assertEquals(1, ranges[1].partition()); + assertEquals(0, ranges[1].fromOffset()); + assertEquals(300, ranges[1].untilOffset()); + assertEquals(1, ranges[2].partition()); + assertEquals(300, ranges[2].fromOffset()); + assertEquals(600, ranges[2].untilOffset()); + + // all empty ranges, do not ignore empty ranges + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {100, 0}), + makeOffsetMap(new int[] {0, 1}, new long[] {100, 0}), 600, 3); + assertEquals(0, CheckpointUtils.totalNewMessages(ranges)); + assertEquals(2, ranges.length); + assertEquals(0, ranges[0].partition()); + assertEquals(100, ranges[0].fromOffset()); + assertEquals(100, ranges[0].untilOffset()); + assertEquals(1, ranges[1].partition()); + assertEquals(0, ranges[1].fromOffset()); + assertEquals(0, ranges[1].untilOffset()); + + // minPartitions more than maxEvents + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, new long[] {0}), + makeOffsetMap(new int[] {0}, new long[] {2}), 600, 3); + assertEquals(2, ranges.length); + assertEquals(0, ranges[0].fromOffset()); + assertEquals(1, ranges[0].untilOffset()); + assertEquals(1, ranges[1].fromOffset()); + assertEquals(2, ranges[1].untilOffset()); + } + + @Test + public void testSplitAndMergeRanges() { + OffsetRange range = OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100); + OffsetRange[] ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {0, 0}), + makeOffsetMap(new int[] {0, 1}, new long[] {100, 500}), 600, 4); + assertEquals(4, ranges.length); + OffsetRange[] mergedRanges = CheckpointUtils.mergeRangesByTopicPartition(ranges); + assertEquals(2, mergedRanges.length); + assertEquals(0, mergedRanges[0].partition()); + assertEquals(0, mergedRanges[0].fromOffset()); + assertEquals(100, mergedRanges[0].untilOffset()); + assertEquals(1, mergedRanges[1].partition()); + assertEquals(0, mergedRanges[1].fromOffset()); + assertEquals(500, mergedRanges[1].untilOffset()); + + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0}, new long[] {0}), + makeOffsetMap(new int[] {0}, new long[] {1000}), 300, 3); + assertEquals(3, ranges.length); + mergedRanges = CheckpointUtils.mergeRangesByTopicPartition(ranges); + assertEquals(1, mergedRanges.length); + assertEquals(0, mergedRanges[0].fromOffset()); + assertEquals(300, mergedRanges[0].untilOffset()); } private static Map<TopicPartition, Long> makeOffsetMap(int[] partitions, long[] offsets) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java index d00b1f92b1f..e7b5a0ab5ee 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.Option; import org.apache.hudi.exception.HoodieNotSupportedException; +import org.apache.hudi.utilities.config.KafkaSourceConfig; import org.apache.hudi.utilities.ingestion.HoodieIngestionMetrics; import org.apache.hudi.utilities.testutils.UtilitiesTestBase.Helpers; @@ -170,6 +171,76 @@ public class TestKafkaOffsetGen { assertEquals(500, nextOffsetRanges[1].untilOffset()); } + @Test + public void testGetNextOffsetRangesWithMinPartitionsForSinglePartition() { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.createTopic(testTopicName, 1); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + TypedProperties props = getConsumerConfigs("earliest", "string"); + + // default no minPartition set + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props); + OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(0, nextOffsetRanges[0].fromOffset()); + assertEquals(300, nextOffsetRanges[0].untilOffset()); + + props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 2L); + kafkaOffsetGen = new KafkaOffsetGen(props); + nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(0, nextOffsetRanges[0].fromOffset()); + assertEquals(150, nextOffsetRanges[0].untilOffset()); + assertEquals(150, nextOffsetRanges[1].fromOffset()); + assertEquals(300, nextOffsetRanges[1].untilOffset()); + } + + @Test + public void testGetNextOffsetRangesWithMinPartitionsForMultiPartition() { + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + testUtils.createTopic(testTopicName, 2); + testUtils.sendMessages(testTopicName, Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 1000))); + TypedProperties props = getConsumerConfigs("earliest", "string"); + + // default no minPartition or minPartition less than TopicPartitions + KafkaOffsetGen kafkaOffsetGen = new KafkaOffsetGen(props); + OffsetRange[] nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(2, nextOffsetRanges.length); + assertEquals(0, nextOffsetRanges[0].partition()); + assertEquals(0, nextOffsetRanges[0].fromOffset()); + assertEquals(150, nextOffsetRanges[0].untilOffset()); + assertEquals(1, nextOffsetRanges[1].partition()); + assertEquals(0, nextOffsetRanges[1].fromOffset()); + assertEquals(150, nextOffsetRanges[1].untilOffset()); + + props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 1L); + kafkaOffsetGen = new KafkaOffsetGen(props); + nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(2, nextOffsetRanges.length); + assertEquals(0, nextOffsetRanges[0].partition()); + assertEquals(0, nextOffsetRanges[0].fromOffset()); + assertEquals(150, nextOffsetRanges[0].untilOffset()); + assertEquals(1, nextOffsetRanges[1].partition()); + assertEquals(0, nextOffsetRanges[1].fromOffset()); + assertEquals(150, nextOffsetRanges[1].untilOffset()); + + // minPartition more than TopicPartitions + props.put(KafkaSourceConfig.KAFKA_SOURCE_MIN_PARTITIONS.key(), 4L); + kafkaOffsetGen = new KafkaOffsetGen(props); + nextOffsetRanges = kafkaOffsetGen.getNextOffsetRanges(Option.empty(), 300, metrics); + assertEquals(4, nextOffsetRanges.length); + assertEquals(0, nextOffsetRanges[0].partition()); + assertEquals(0, nextOffsetRanges[0].fromOffset()); + assertEquals(75, nextOffsetRanges[0].untilOffset()); + assertEquals(0, nextOffsetRanges[1].partition()); + assertEquals(75, nextOffsetRanges[1].fromOffset()); + assertEquals(150, nextOffsetRanges[1].untilOffset()); + assertEquals(1, nextOffsetRanges[2].partition()); + assertEquals(0, nextOffsetRanges[2].fromOffset()); + assertEquals(75, nextOffsetRanges[2].untilOffset()); + assertEquals(1, nextOffsetRanges[3].partition()); + assertEquals(75, nextOffsetRanges[3].fromOffset()); + assertEquals(150, nextOffsetRanges[3].untilOffset()); + } + @Test public void testCheckTopicExists() { TypedProperties props = getConsumerConfigs("latest", "string");