bvaradar commented on code in PR #8376: URL: https://github.com/apache/hudi/pull/8376#discussion_r1178488660
########## hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java: ########## @@ -57,63 +58,191 @@ public void testStringToOffsets() { @Test public void testOffsetToString() { OffsetRange[] ranges = - CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 1000000L); + CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 1000000L, 0); assertEquals(TEST_TOPIC_NAME + ",0:300000,1:350000", CheckpointUtils.offsetsToStr(ranges)); + + ranges = new OffsetRange[] { + OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200), + OffsetRange.apply(TEST_TOPIC_NAME, 1, 100, 200), + OffsetRange.apply(TEST_TOPIC_NAME, 1, 200, 300)}; + assertEquals(TEST_TOPIC_NAME + ",0:200,1:300", CheckpointUtils.offsetsToStr(ranges)); } @Test - public void testComputeOffsetRanges() { + public void testComputeOffsetRangesWithoutMinPartitions() { // test totalNewMessages() - long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100), - OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)}); + long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[] {OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100), + OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)}); assertEquals(200, totalMsgs); // should consume all the full data OffsetRange[] ranges = - CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 1000000L); + CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 1000000L, 0); assertEquals(200000, CheckpointUtils.totalNewMessages(ranges)); // should only consume upto limit - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000); + ranges = CheckpointUtils.computeOffsetRanges( + makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000, 0); assertEquals(10000, CheckpointUtils.totalNewMessages(ranges)); assertEquals(200000, ranges[0].fromOffset()); assertEquals(205000, ranges[0].untilOffset()); assertEquals(250000, ranges[1].fromOffset()); assertEquals(255000, ranges[1].untilOffset()); // should also consume from new partitions. - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 100000}), 1000000L); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000, 100000}), 1000000L, 0); assertEquals(300000, CheckpointUtils.totalNewMessages(ranges)); assertEquals(3, ranges.length); // for skewed offsets, does not starve any partition & can catch up - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 100000); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 100000, 0); assertEquals(100000, CheckpointUtils.totalNewMessages(ranges)); assertEquals(10, ranges[0].count()); assertEquals(89990, ranges[1].count()); assertEquals(10000, ranges[2].count()); - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, new long[]{200000, 250000}), - makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 10000}), 1000000); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}), + makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 10000}), 1000000, 0); assertEquals(110010, CheckpointUtils.totalNewMessages(ranges)); assertEquals(10, ranges[0].count()); assertEquals(100000, ranges[1].count()); assertEquals(10000, ranges[2].count()); // not all partitions consume same entries. - ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{0, 0, 0, 0, 0}), - makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000, 1000, 1000, 1000}), 1001); + ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}), + makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 1000, 1000}), 1001, 0); assertEquals(1001, CheckpointUtils.totalNewMessages(ranges)); assertEquals(100, ranges[0].count()); assertEquals(226, ranges[1].count()); - assertEquals(226, ranges[2].count()); - assertEquals(226, ranges[3].count()); - assertEquals(223, ranges[4].count()); + assertEquals(225, ranges[2].count()); Review Comment: Makes sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org