stevenzwu commented on code in PR #4943:
URL: https://github.com/apache/iceberg/pull/4943#discussion_r892549063
##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -208,6 +211,94 @@ public void testCheckpointRestore() throws Exception {
}
}
+ @Test
+ public void testConsumeWithMaxSnapshotCountPerMonitorInterval() throws
Exception {
+ List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+
+ final ScanContext scanContext1 = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .maxSnapshotCountPerMonitorInterval(0)
+ .build();
+
+ AssertHelpers.assertThrows("Should throw exception because of invalid
config",
+ IllegalArgumentException.class, "must be greater than zero",
+ () -> {
+ createFunction(scanContext1);
+ return null;
+ }
+ );
+
+ final ScanContext scanContext2 = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .maxSnapshotCountPerMonitorInterval(-10)
+ .build();
+
+ AssertHelpers.assertThrows("Should throw exception because of invalid
config",
+ IllegalArgumentException.class, "must be greater than zero",
+ () -> {
+ createFunction(scanContext2);
+ return null;
+ }
+ );
+
+ List<List<Record>> expectedRecords = recordsList.subList(1,
recordsList.size());
+
+ // Use the oldest snapshot as starting to avoid the initial case.
+ long oldestSnapshotId = SnapshotUtil.oldestAncestor(table).snapshotId();
+
+ ScanContext scanContext3 = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(100))
+ .splitSize(1000L)
+ .startSnapshotId(oldestSnapshotId)
+ .maxSnapshotCountPerMonitorInterval(Integer.MAX_VALUE)
+ .build();
+
+ FlinkInputSplit[] expectedSplits = FlinkSplitPlanner
+ .planInputSplits(table, scanContext3, ThreadPools.getWorkerPool());
+
+ Assert.assertEquals("should produce 9 splits", 9, expectedSplits.length);
+
+ for (int maxSnapshotsNum = 1; maxSnapshotsNum < 15; maxSnapshotsNum =
maxSnapshotsNum + 1) {
+ ScanContext scanContext = ScanContext.builder()
+ .monitorInterval(Duration.ofMillis(500))
+ .startSnapshotId(oldestSnapshotId)
+ .splitSize(1000L)
+ .maxSnapshotCountPerMonitorInterval(maxSnapshotsNum)
+ .build();
+
+ StreamingMonitorFunction function = createFunction(scanContext);
+ try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness =
createHarness(function)) {
+ harness.setup();
+ harness.open();
+
+ CountDownLatch latch = new CountDownLatch(9);
+ TestSourceContext sourceContext = new TestSourceContext(latch);
+ runSourceFunctionInTask(sourceContext, function);
+ // Ensure the first loop in monitoring finished
+ Thread.sleep(100);
+
+ // Take lock to prevent monitoring loop
+ synchronized (sourceContext.getCheckpointLock()) {
+ if (maxSnapshotsNum < 10) {
+ // it produces one split for one snapshot.
+ Assert.assertEquals("Should produce same splits as
max-snapshot-count-per-monitor-interval",
+ maxSnapshotsNum, sourceContext.splits.size());
+ }
+ }
+
+ Assert.assertTrue("Should have expected elements.",
Review Comment:
a few seconds is long for unit test. We just need to test the 3 scenarios of
the `maxSnapshotsNum`: less, equal, greater.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]