jerryshao commented on a change in pull request #1627:
URL: https://github.com/apache/iceberg/pull/1627#discussion_r511960994



##########
File path: core/src/test/java/org/apache/iceberg/TestMicroBatchBuilder.java
##########
@@ -140,15 +141,104 @@ public void testGenerateMicroBatchWithSmallTargetSize() {
     Assert.assertTrue(batch5.lastIndexOfSnapshot());
   }
 
-  private static DataFile file(String name) {
+  @Test
+  public void testMicroBatchRespectsRequestedMaximumSize() {
+    // Add files A-E, all of 10kb, and process in multiple microbatches of 
varying sizes,
+    // emulating perhaps a dynamically growing source (assuming the total 
batch is being
+    // built by one process, as in Flink or on the driver in Spark).
+    add(table.newAppend(), files("A", "B", "C", "D", "E"));
+
+    // Request 10kb - Receive file A.
+    MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(0, 10L, true);
+    Assert.assertEquals(batch0.snapshotId(), 1L);
+    Assert.assertEquals(batch0.startFileIndex(), 0);
+    Assert.assertEquals(batch0.endFileIndex(), 1);
+    Assert.assertEquals(batch0.sizeInBytes(), 10);
+    Assert.assertFalse(batch0.lastIndexOfSnapshot());
+    filesMatch(Lists.newArrayList("A"), filesToScan(batch0.tasks()));
+
+    // Request 30kb. Receive B, C, and D.
+    MicroBatch batch1 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(batch0.endFileIndex(), 35L, false);
+    Assert.assertEquals(batch1.startFileIndex(), 1);
+    Assert.assertEquals(batch1.endFileIndex(), 4);
+    Assert.assertEquals(batch1.sizeInBytes(), 30);
+    filesMatch(Lists.newArrayList("B", "C", "D"), filesToScan(batch1.tasks()));
+    Assert.assertFalse(batch1.lastIndexOfSnapshot());
+
+    // Request 35kb - Receive File E which is the end of the input, only 10kb.
+    MicroBatch batch2 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(batch1.endFileIndex(), 35L, false);
+    Assert.assertEquals(4, batch2.startFileIndex());
+    Assert.assertEquals(5, batch2.endFileIndex());
+    Assert.assertEquals(10, batch2.sizeInBytes());
+    filesMatch(Lists.newArrayList("E"), filesToScan(batch2.tasks()));
+  }
+
+  @Test
+  public void testReadingSnapshotIsNotInterruptedByChildSnapshot() {
+    // Add files A-E, all of 10kb, and process the single generated snapshot
+    // in multiple microbatches.
+    add(table.newAppend(), files("A", "B", "C", "D", "E"));
+    Assert.assertEquals(1L, table.currentSnapshot().snapshotId());
+
+    // Request a batch of 40kb - Reads in A, B, C, and D.
+    MicroBatch batch0 = MicroBatches.from(table.snapshot(1L), table.io())
+            .specsById(table.specs())
+            .generate(0, 40L, false);
+    Assert.assertEquals(0, batch0.startFileIndex());
+    Assert.assertEquals(4, batch0.endFileIndex());
+    Assert.assertEquals(40, batch0.sizeInBytes());
+    filesMatch(Lists.newArrayList("A", "B", "C", "D"), 
filesToScan(batch0.tasks()));
+    Assert.assertFalse(batch0.lastIndexOfSnapshot());
+
+    // Concurrent write sometime after the start of the last batch and before 
the next batch.
+    final long sizeOfFileF = 25L;
+    add(table.newAppend(),
+            Collections.singletonList(fileWithSize("F", sizeOfFileF)));
+    Assert.assertEquals(2L, table.currentSnapshot().snapshotId());

Review comment:
       This will generate a new snapshot, seems unrelated to the `MicroBatch` 
processed before, not sure what's the purpose of this testing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to