This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new dfecabc76c CP 12988 to Spark version (#13412)
dfecabc76c is described below

commit dfecabc76c5b82ecdde7d61abdf540d43ee14f59
Author: Prashant Singh <[email protected]>
AuthorDate: Tue Jul 1 16:55:14 2025 -0700

    CP 12988 to Spark version (#13412)
    
    Co-authored-by: Prashant Singh <[email protected]>
---
 .../spark/source/SparkMicroBatchStream.java        |  17 +++-
 .../spark/source/TestStructuredStreamingRead3.java | 109 +++++++++------------
 .../spark/source/SparkMicroBatchStream.java        |  15 ++-
 .../spark/source/TestStructuredStreamingRead3.java | 102 +++++++++----------
 4 files changed, 119 insertions(+), 124 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 859f1b23bf..fa18b6edd8 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -349,7 +349,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
       }
     }
 
-    // there is no ReadMaxRows, so return the default
+    // There is no ReadMaxRows, so return the default
     return Integer.MAX_VALUE;
   }
 
@@ -388,7 +388,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
     boolean shouldContinueReading = true;
     int curFilesAdded = 0;
     int curRecordCount = 0;
-    int curPos = 0;
+    long curPos = 0;
 
     // Note : we produce nextOffset with pos as non-inclusive
     while (shouldContinueReading) {
@@ -412,14 +412,23 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
           while (taskIter.hasNext()) {
             FileScanTask task = taskIter.next();
             if (curPos >= startPosOfSnapOffset) {
-              if ((curFilesAdded + 1) > getMaxFiles(limit)
-                  || (curRecordCount + task.file().recordCount()) > 
getMaxRows(limit)) {
+              if ((curFilesAdded + 1) > getMaxFiles(limit)) {
+                // On including the file it might happen that we might exceed, 
the configured
+                // soft limit on the number of records, since this is a soft 
limit its acceptable.
                 shouldContinueReading = false;
                 break;
               }
 
               curFilesAdded += 1;
               curRecordCount += task.file().recordCount();
+
+              if (curRecordCount >= getMaxRows(limit)) {
+                // we included the file, so increment the number of files
+                // read in the current snapshot.
+                ++curPos;
+                shouldContinueReading = false;
+                break;
+              }
             }
             ++curPos;
           }
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 5ce8cb6645..4f51819312 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -157,50 +157,42 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   public void testReadStreamWithMaxFiles1() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
   public void testReadStreamWithMaxFiles2() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")))
-        .isEqualTo(3);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L));
   }
 
   @TestTemplate
   public void testReadStreamWithMaxRows1() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
 
-    // only 1 micro-batch will be formed and we will read data partially
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
-        .isEqualTo(1);
+    // soft limit of 1 is being enforced, the stream is not blocked.
+    StreamingQuery query = 
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
 
-    StreamingQuery query =
-        
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "1"));
-
-    // check answer correctness only 1 record read the micro-batch will be 
stuck
     List<SimpleRecord> actual = rowsAvailable(query);
     assertThat(actual)
-        .containsExactlyInAnyOrderElementsOf(
-            Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
+        
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
   }
 
   @TestTemplate
   public void testReadStreamWithMaxRows2() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
-        .isEqualTo(4);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L));
 
     StreamingQuery query =
         
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "2"));
@@ -214,22 +206,21 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   public void testReadStreamWithMaxRows4() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
-        .isEqualTo(2);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"), List.of(4L, 3L));
   }
 
   @TestTemplate
   public void testReadStreamWithCompositeReadLimit() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                ImmutableMap.of(
-                    SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
-                    SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+            "4",
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+            "1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -542,10 +533,9 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     makeRewriteDataFiles();
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -557,10 +547,9 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     makeRewriteDataFiles();
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
-        .isEqualTo(2);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"4"),
+        List.of(5L, 2L));
   }
 
   @TestTemplate
@@ -571,15 +560,13 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     appendDataAsMultipleSnapshots(expected);
 
     makeRewriteDataFiles();
-
-    assertThat(
-            microBatchCount(
-                ImmutableMap.of(
-                    SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
-                    "4",
-                    SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
-                    "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+            "4",
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+            "1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -591,10 +578,9 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     makeRewriteDataFiles();
     makeRewriteDataFiles();
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -608,10 +594,9 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     appendDataAsMultipleSnapshots(expected);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(12);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L, 1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -778,8 +763,11 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         ImmutableMap.of(key, value, 
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"));
   }
 
-  private int microBatchCount(Map<String, String> options) throws 
TimeoutException {
+  private void assertMicroBatchRecordSizes(
+      Map<String, String> options, List<Long> expectedMicroBatchRecordSize)
+      throws TimeoutException {
     Dataset<Row> ds = 
spark.readStream().options(options).format("iceberg").load(tableName);
+    List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
 
     ds.writeStream()
         .options(options)
@@ -787,12 +775,13 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
             (VoidFunction2<Dataset<Row>, Long>)
                 (dataset, batchId) -> {
                   microBatches.getAndIncrement();
+                  syncList.add(dataset.count());
                 })
         .start()
         .processAllAvailable();
 
     stopStreams();
-    return microBatches.get();
+    
assertThat(syncList).containsExactlyInAnyOrderElementsOf(expectedMicroBatchRecordSize);
   }
 
   private List<SimpleRecord> rowsAvailable(StreamingQuery query) {
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index d348c3cc57..f4468fe156 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -412,14 +412,23 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
           while (taskIter.hasNext()) {
             FileScanTask task = taskIter.next();
             if (curPos >= startPosOfSnapOffset) {
-              if ((curFilesAdded + 1) > getMaxFiles(limit)
-                  || (curRecordCount + task.file().recordCount()) > 
getMaxRows(limit)) {
+              if ((curFilesAdded + 1) > getMaxFiles(limit)) {
+                // On including the file it might happen that we might exceed, 
the configured
+                // soft limit on the number of records, since this is a soft 
limit its acceptable.
                 shouldContinueReading = false;
                 break;
               }
 
               curFilesAdded += 1;
               curRecordCount += task.file().recordCount();
+
+              if (curRecordCount >= getMaxRows(limit)) {
+                // we included the file, so increment the number of files
+                // read in the current snapshot.
+                ++curPos;
+                shouldContinueReading = false;
+                break;
+              }
             }
             ++curPos;
           }
@@ -436,7 +445,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsAdmissio
       if (shouldContinueReading) {
         Snapshot nextValid = nextValidSnapshot(curSnapshot);
         if (nextValid == null) {
-          // nextValide implies all the remaining snapshots should be skipped.
+          // nextValid implies all the remaining snapshots should be skipped.
           break;
         }
         // we found the next available snapshot, continue from there.
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 370abfe096..bb664f0c72 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -157,50 +157,44 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   public void testReadStreamWithMaxFiles1() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
   public void testReadStreamWithMaxFiles2() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "2")))
-        .isEqualTo(3);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L));
   }
 
   @TestTemplate
   public void testReadStreamWithMaxRows1() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    // only 1 micro-batch will be formed and we will read data partially
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1")))
-        .isEqualTo(1);
-
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
+    // soft limit of 1 is being enforced, the stream is not blocked.
     StreamingQuery query =
         
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "1"));
 
     // check answer correctness only 1 record read the micro-batch will be 
stuck
     List<SimpleRecord> actual = rowsAvailable(query);
     assertThat(actual)
-        .containsExactlyInAnyOrderElementsOf(
-            Lists.newArrayList(TEST_DATA_MULTIPLE_SNAPSHOTS.get(0).get(0)));
+        
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
   }
 
   @TestTemplate
   public void testReadStreamWithMaxRows2() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
-        .isEqualTo(4);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"2"),
+        List.of(3L, 2L, 2L));
 
     StreamingQuery query =
         
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "2"));
@@ -214,22 +208,19 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   public void testReadStreamWithMaxRows4() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
-        .isEqualTo(2);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"), List.of(4L, 3L));
   }
 
   @TestTemplate
   public void testReadStreamWithCompositeReadLimit() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
 
-    assertThat(
-            microBatchCount(
-                ImmutableMap.of(
-                    SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
-                    SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -542,10 +533,9 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     makeRewriteDataFiles();
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -557,10 +547,8 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     makeRewriteDataFiles();
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "4")))
-        .isEqualTo(2);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"), List.of(4L, 3L));
   }
 
   @TestTemplate
@@ -572,14 +560,13 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     makeRewriteDataFiles();
 
-    assertThat(
-            microBatchCount(
-                ImmutableMap.of(
-                    SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
-                    "4",
-                    SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
-                    "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
+            "4",
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+            "1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -591,10 +578,9 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     makeRewriteDataFiles();
     makeRewriteDataFiles();
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(6);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -608,10 +594,9 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     appendDataAsMultipleSnapshots(expected);
 
-    assertThat(
-            microBatchCount(
-                
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")))
-        .isEqualTo(12);
+    assertMicroBatchRecordSizes(
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
+        List.of(1L, 2L, 1L, 1L, 1L, 1L, 1L, 2L, 1L, 1L, 1L, 1L));
   }
 
   @TestTemplate
@@ -778,21 +763,24 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         ImmutableMap.of(key, value, 
SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"));
   }
 
-  private int microBatchCount(Map<String, String> options) throws 
TimeoutException {
+  private void assertMicroBatchRecordSizes(
+      Map<String, String> options, List<Long> expectedMicroBatchRecordSize)
+      throws TimeoutException {
     Dataset<Row> ds = 
spark.readStream().options(options).format("iceberg").load(tableName);
 
+    List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
     ds.writeStream()
         .options(options)
         .foreachBatch(
             (VoidFunction2<Dataset<Row>, Long>)
                 (dataset, batchId) -> {
-                  microBatches.getAndIncrement();
+                  syncList.add(dataset.count());
                 })
         .start()
         .processAllAvailable();
 
     stopStreams();
-    return microBatches.get();
+    
assertThat(syncList).containsExactlyInAnyOrderElementsOf(expectedMicroBatchRecordSize);
   }
 
   private List<SimpleRecord> rowsAvailable(StreamingQuery query) {

Reply via email to