stevenzwu commented on code in PR #4943:
URL: https://github.com/apache/iceberg/pull/4943#discussion_r891544989


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -208,6 +211,94 @@ public void testCheckpointRestore() throws Exception {
     }
   }
 
+  @Test
+  public void testConsumeWithMaxSnapshotCountPerMonitorInterval() throws 
Exception {
+    List<List<Record>> recordsList = generateRecordsAndCommitTxn(10);
+
+    final ScanContext scanContext1 = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .maxSnapshotCountPerMonitorInterval(0)
+        .build();
+
+    AssertHelpers.assertThrows("Should throw exception because of invalid 
config",
+        IllegalArgumentException.class, "must be greater than zero",
+        () -> {
+          createFunction(scanContext1);
+          return null;
+        }
+    );
+
+    final ScanContext scanContext2 = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .maxSnapshotCountPerMonitorInterval(-10)
+        .build();
+
+    AssertHelpers.assertThrows("Should throw exception because of invalid 
config",
+        IllegalArgumentException.class, "must be greater than zero",
+        () -> {
+          createFunction(scanContext2);
+          return null;
+        }
+    );
+
+    List<List<Record>> expectedRecords = recordsList.subList(1, 
recordsList.size());
+
+    // Use the oldest snapshot as starting to avoid the initial case.
+    long oldestSnapshotId = SnapshotUtil.oldestAncestor(table).snapshotId();
+
+    ScanContext scanContext3 = ScanContext.builder()
+        .monitorInterval(Duration.ofMillis(100))
+        .splitSize(1000L)
+        .startSnapshotId(oldestSnapshotId)
+        .maxSnapshotCountPerMonitorInterval(Integer.MAX_VALUE)
+        .build();
+
+    FlinkInputSplit[] expectedSplits = FlinkSplitPlanner
+        .planInputSplits(table, scanContext3, ThreadPools.getWorkerPool());
+
+    Assert.assertEquals("should produce 9 splits", 9, expectedSplits.length);
+
+    for (int maxSnapshotsNum = 1; maxSnapshotsNum < 15; maxSnapshotsNum = 
maxSnapshotsNum + 1) {
+      ScanContext scanContext = ScanContext.builder()
+          .monitorInterval(Duration.ofMillis(500))
+          .startSnapshotId(oldestSnapshotId)
+          .splitSize(1000L)
+          .maxSnapshotCountPerMonitorInterval(maxSnapshotsNum)
+          .build();
+
+      StreamingMonitorFunction function = createFunction(scanContext);
+      try (AbstractStreamOperatorTestHarness<FlinkInputSplit> harness = 
createHarness(function)) {
+        harness.setup();
+        harness.open();
+
+        CountDownLatch latch = new CountDownLatch(9);
+        TestSourceContext sourceContext = new TestSourceContext(latch);
+        runSourceFunctionInTask(sourceContext, function);
+        // Ensure the first loop in monitoring finished
+        Thread.sleep(100);
+
+        // Take lock to prevent monitoring loop
+        synchronized (sourceContext.getCheckpointLock()) {
+          if (maxSnapshotsNum < 10) {
+            // it produces one split for one snapshot.
+            Assert.assertEquals("Should produce same splits as 
max-snapshot-count-per-monitor-interval",
+                maxSnapshotsNum, sourceContext.splits.size());
+          }
+        }
+
+        Assert.assertTrue("Should have expected elements.",

Review Comment:
   I feel it is unnecessary to wait for all the snapshots. Should we make 
`monitorAndForwardSplits` package default so that it can be directly in this 
test class? We can use `AbstractStreamOperatorTestHarness#getOperator`. 
   
   We just need to verify that monitorAndForwardSplits limits the number of 
discovered snapshots/splits to `maxSnapshotsNum`. We can just take a few values 
of maxSnapshotsNum (like 3, 9, 12). no need to loop 1 to 15.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to