bvaradar commented on code in PR #8376:
URL: https://github.com/apache/hudi/pull/8376#discussion_r1178488660


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCheckpointUtils.java:
##########
@@ -57,63 +58,191 @@ public void testStringToOffsets() {
   @Test
   public void testOffsetToString() {
     OffsetRange[] ranges =
-            CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, 
new long[]{200000, 250000}),
-                    makeOffsetMap(new int[]{0, 1}, new long[]{300000, 
350000}), 1000000L);
+        CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, 
new long[] {200000, 250000}),
+            makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 
1000000L, 0);
     assertEquals(TEST_TOPIC_NAME + ",0:300000,1:350000", 
CheckpointUtils.offsetsToStr(ranges));
+
+    ranges = new OffsetRange[] {
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 100, 200),
+        OffsetRange.apply(TEST_TOPIC_NAME, 1, 200, 300)};
+    assertEquals(TEST_TOPIC_NAME + ",0:200,1:300", 
CheckpointUtils.offsetsToStr(ranges));
   }
 
   @Test
-  public void testComputeOffsetRanges() {
+  public void testComputeOffsetRangesWithoutMinPartitions() {
     // test totalNewMessages()
-    long totalMsgs = CheckpointUtils.totalNewMessages(new 
OffsetRange[]{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
-            OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
+    long totalMsgs = CheckpointUtils.totalNewMessages(new OffsetRange[] 
{OffsetRange.apply(TEST_TOPIC_NAME, 0, 0, 100),
+        OffsetRange.apply(TEST_TOPIC_NAME, 0, 100, 200)});
     assertEquals(200, totalMsgs);
 
     // should consume all the full data
     OffsetRange[] ranges =
-            CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1}, 
new long[]{200000, 250000}),
-                    makeOffsetMap(new int[]{0, 1}, new long[]{300000, 
350000}), 1000000L);
+        CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 1}, 
new long[] {200000, 250000}),
+            makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 
1000000L, 0);
     assertEquals(200000, CheckpointUtils.totalNewMessages(ranges));
 
     // should only consume upto limit
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1}, new long[]{300000, 350000}), 10000);
+    ranges = CheckpointUtils.computeOffsetRanges(
+        makeOffsetMap(new int[] {0, 1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1}, new long[] {300000, 350000}), 10000, 
0);
     assertEquals(10000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(200000, ranges[0].fromOffset());
     assertEquals(205000, ranges[0].untilOffset());
     assertEquals(250000, ranges[1].fromOffset());
     assertEquals(255000, ranges[1].untilOffset());
 
     // should also consume from new partitions.
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{300000, 350000, 
100000}), 1000000L);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {300000, 350000, 
100000}), 1000000L, 0);
     assertEquals(300000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(3, ranges.length);
 
     // for skewed offsets, does not starve any partition & can catch up
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 
10000}), 100000);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 
10000}), 100000, 0);
     assertEquals(100000, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(10, ranges[0].count());
     assertEquals(89990, ranges[1].count());
     assertEquals(10000, ranges[2].count());
 
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 
1}, new long[]{200000, 250000}),
-            makeOffsetMap(new int[]{0, 1, 2}, new long[]{200010, 350000, 
10000}), 1000000);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1}, new long[] {200000, 250000}),
+        makeOffsetMap(new int[] {0, 1, 2}, new long[] {200010, 350000, 
10000}), 1000000, 0);
     assertEquals(110010, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(10, ranges[0].count());
     assertEquals(100000, ranges[1].count());
     assertEquals(10000, ranges[2].count());
 
     // not all partitions consume same entries.
-    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[]{0, 1, 
2, 3, 4}, new long[]{0, 0, 0, 0, 0}),
-            makeOffsetMap(new int[]{0, 1, 2, 3, 4}, new long[]{100, 1000, 
1000, 1000, 1000}), 1001);
+    ranges = CheckpointUtils.computeOffsetRanges(makeOffsetMap(new int[] {0, 
1, 2, 3, 4}, new long[] {0, 0, 0, 0, 0}),
+        makeOffsetMap(new int[] {0, 1, 2, 3, 4}, new long[] {100, 1000, 1000, 
1000, 1000}), 1001, 0);
     assertEquals(1001, CheckpointUtils.totalNewMessages(ranges));
     assertEquals(100, ranges[0].count());
     assertEquals(226, ranges[1].count());
-    assertEquals(226, ranges[2].count());
-    assertEquals(226, ranges[3].count());
-    assertEquals(223, ranges[4].count());
+    assertEquals(225, ranges[2].count());

Review Comment:
   Makes sense. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to