This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e57247b55a Spark 3.4: Backport Async Micro Batch Planner to 3.4 
(#16311)
e57247b55a is described below

commit e57247b55a737cfc3702949181b3a2a3d10174b2
Author: Kevin Liu <[email protected]>
AuthorDate: Tue May 12 23:29:53 2026 -0400

    Spark 3.4: Backport Async Micro Batch Planner to 3.4 (#16311)
    
    Backport of #15992 to spark/v3.4. Stacked on PR #16307 (#15683 
SerializableFileIOWithSize), which is itself a backport.
    
    Adaptations from the source PR:
    
    - SparkMicroBatchStream.java was replaced wholesale with the v3.5 
post-#15992 version because v3.4 had structural drift; the refactor extracts 
the planning logic into the new planner classes and there are no v3.4-only 
features in this file.
    
    - TestStructuredStreamingRead3.java was likewise replaced with the v3.5 
version (which adds parameterized sync/async coverage). The only non-mechanical 
change is using 'SparkCatalogConfig.SPARK' instead of 
'SparkCatalogConfig.SPARK_SESSION', because v3.4 still uses the older enum name.
---
 .../org/apache/iceberg/spark/SparkReadConf.java    |  33 ++
 .../org/apache/iceberg/spark/SparkReadOptions.java |  15 +
 .../apache/iceberg/spark/SparkSQLProperties.java   |   5 +
 .../spark/source/AsyncSparkMicroBatchPlanner.java  | 543 +++++++++++++++++++++
 .../spark/source/BaseSparkMicroBatchPlanner.java   | 151 ++++++
 .../iceberg/spark/source/MicroBatchUtils.java      |  69 +++
 .../spark/source/SparkMicroBatchPlanner.java       |  47 ++
 .../spark/source/SparkMicroBatchStream.java        | 353 ++------------
 .../spark/source/SyncSparkMicroBatchPlanner.java   | 249 ++++++++++
 .../source/TestAsyncSparkMicroBatchPlanner.java    |  61 +++
 .../spark/source/TestMicroBatchPlanningUtils.java  | 100 ++++
 .../spark/source/TestStructuredStreamingRead3.java | 309 +++++++++++-
 12 files changed, 1588 insertions(+), 347 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 3c8556731f..0c13338764 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -265,6 +265,39 @@ public class SparkReadConf {
         .parse();
   }
 
+  public boolean asyncMicroBatchPlanningEnabled() {
+    return confParser
+        .booleanConf()
+        .option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+        .sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+        
.defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT)
+        .parse();
+  }
+
+  public long streamingSnapshotPollingIntervalMs() {
+    return confParser
+        .longConf()
+        .option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS)
+        
.defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT)
+        .parse();
+  }
+
+  public long asyncQueuePreloadFileLimit() {
+    return confParser
+        .longConf()
+        .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT)
+        .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT)
+        .parse();
+  }
+
+  public long asyncQueuePreloadRowLimit() {
+    return confParser
+        .longConf()
+        .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT)
+        .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT)
+        .parse();
+  }
+
   public boolean preserveDataGrouping() {
     return confParser
         .booleanConf()
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index c31a7e5554..a6e6479d66 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -92,6 +92,21 @@ public class SparkReadOptions {
   public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
       "streaming-max-rows-per-micro-batch";
 
+  // Enable async micro batch planning
+  public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+      "async-micro-batch-planning-enabled";
+
+  // Polling interval for async planner to refresh table metadata (ms)
+  public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS =
+      "streaming-snapshot-polling-interval-ms";
+  public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT = 
30000L;
+
+  // Initial queue preload limits for async micro batch planner
+  public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT = 
"async-queue-preload-file-limit";
+  public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L;
+  public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT = 
"async-queue-preload-row-limit";
+  public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L;
+
   // Table path
   public static final String PATH = "path";
 
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 41dc73add3..1ccf8c3c83 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -106,4 +106,9 @@ public class SparkSQLProperties {
   // Controls whether to report available column statistics to Spark for query 
optimization.
   public static final String REPORT_COLUMN_STATS = 
"spark.sql.iceberg.report-column-stats";
   public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;
+
+  // Controls whether to enable async micro batch planning for session
+  public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+      "spark.sql.iceberg.async-micro-batch-planning-enabled";
+  public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = 
false;
 }
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..3e442f9917
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.connector.read.streaming.ReadAllAvailable;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner 
implements AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncSparkMicroBatchPlanner.class);
+  private static final int PLAN_FILES_CACHE_MAX_SIZE = 10;
+  private static final long QUEUE_POLL_TIMEOUT_MS = 100L; // 100 ms
+
+  private final long minQueuedFiles;
+  private final long minQueuedRows;
+
+  // Cache for planFiles results to handle duplicate calls
+  private final Cache<Pair<StreamingOffset, StreamingOffset>, 
List<FileScanTask>> planFilesCache;
+
+  // Queue to buffer pre-fetched file scan tasks
+  private final LinkedBlockingDeque<Pair<StreamingOffset, FileScanTask>> queue;
+
+  // Background executor for async operations
+  private final ScheduledExecutorService executor;
+
+  // Error tracking
+  private volatile Throwable refreshFailedThrowable;
+  private volatile Throwable fillQueueFailedThrowable;
+
+  // Tracking queue state
+  private final AtomicLong queuedFileCount = new AtomicLong(0);
+  private final AtomicLong queuedRowCount = new AtomicLong(0);
+  private Snapshot lastQueuedSnapshot;
+  private boolean stopped;
+
+  // Cap for Trigger.AvailableNow - don't process beyond this offset
+  private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+  /**
+   * This class manages a queue of FileScanTask + StreamingOffset. On 
creation, it starts up an
+   * asynchronous polling process which populates the queue when a new 
snapshot arrives or the
+   * minimum amount of queued data is too low.
+   *
+   * <p>Note: this will capture the state of the table when snapshots are 
added to the queue. If a
+   * snapshot is expired after being added to the queue, the job will still 
process it.
+   */
+  AsyncSparkMicroBatchPlanner(
+      Table table,
+      SparkReadConf readConf,
+      StreamingOffset initialOffset,
+      StreamingOffset maybeEndOffset,
+      StreamingOffset lastOffsetForTriggerAvailableNow) {
+    super(table, readConf);
+    this.minQueuedFiles = readConf().maxFilesPerMicroBatch();
+    this.minQueuedRows = readConf().maxRecordsPerMicroBatch();
+    this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+    this.planFilesCache = 
Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build();
+    this.queue = new LinkedBlockingDeque<>();
+
+    table().refresh();
+
+    // Synchronously add data to the queue to meet our initial constraints.
+    // For Trigger.AvailableNow, constructor-time preload is normally 
initialized from
+    // latestOffset(...) with no explicit end offset, so bounded preload must 
stop at
+    // Trigger.AvailableNow snapshot.
+    fillQueue(initialOffset, maybeEndOffset);
+
+    this.executor =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread thread = new Thread(r, "iceberg-async-planner-" + 
table().name());
+              thread.setDaemon(true);
+              return thread;
+            });
+    // Schedule table refresh at configured interval
+    long pollingIntervalMs = readConf().streamingSnapshotPollingIntervalMs();
+    this.executor.scheduleWithFixedDelay(
+        this::refreshAndTrapException, pollingIntervalMs, pollingIntervalMs, 
TimeUnit.MILLISECONDS);
+    // Schedule queue fill to run frequently (use polling interval for tests, 
cap at 100ms for
+    // production)
+    long queueFillIntervalMs = Math.min(QUEUE_POLL_TIMEOUT_MS, 
pollingIntervalMs);
+    executor.scheduleWithFixedDelay(
+        () -> fillQueueAndTrapException(lastQueuedSnapshot),
+        0,
+        queueFillIntervalMs,
+        TimeUnit.MILLISECONDS);
+
+    LOG.info(
+        "Started AsyncSparkMicroBatchPlanner for {} from initialOffset: {}",
+        table().name(),
+        initialOffset);
+  }
+
+  @Override
+  public synchronized void stop() {
+    Preconditions.checkArgument(
+        !stopped, "AsyncSparkMicroBatchPlanner for {} was already stopped", 
table().name());
+    stopped = true;
+    LOG.info("Stopping AsyncSparkMicroBatchPlanner for table: {}", 
table().name());
+    executor.shutdownNow();
+    boolean terminated = false;
+    try {
+      terminated =
+          executor.awaitTermination(
+              readConf().streamingSnapshotPollingIntervalMs() * 2, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ignored) {
+      // Restore interrupt status
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("AsyncSparkMicroBatchPlanner for table: {}, stopped: {}", 
table().name(), terminated);
+  }
+
+  @Override
+  public void close() {
+    stop();
+  }
+
+  /**
+   * Spark can call this multiple times; it should produce the same answer 
every time.
+   *
+   * @param startOffset the starting offset of this microbatch, position is 
inclusive
+   * @param endOffset the end offset of this microbatch, position is exclusive
+   * @return the list of files to scan between these offsets
+   */
+  @Override
+  public synchronized List<FileScanTask> planFiles(
+      StreamingOffset startOffset, StreamingOffset endOffset) {
+    return planFilesCache.get(
+        Pair.of(startOffset, endOffset),
+        key -> {
+          LOG.info(
+              "running planFiles for {}, startOffset: {}, endOffset: {}",
+              table().name(),
+              startOffset,
+              endOffset);
+          List<FileScanTask> result = new LinkedList<>();
+          Pair<StreamingOffset, FileScanTask> elem;
+          StreamingOffset currentOffset;
+          boolean shouldTerminate = false;
+          long filesInPlan = 0;
+          long rowsInPlan = 0;
+
+          do {
+            try {
+              elem = queue.pollFirst(QUEUE_POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException("Interrupted while polling queue", e);
+            }
+
+            if (elem != null) {
+              currentOffset = elem.first();
+              LOG.debug("planFiles consumed: {}", currentOffset);
+              FileScanTask currentTask = elem.second();
+              filesInPlan += 1;
+              long elemRows = currentTask.file().recordCount();
+              rowsInPlan += elemRows;
+              queuedFileCount.decrementAndGet();
+              queuedRowCount.addAndGet(-elemRows);
+              result.add(currentTask);
+
+              // try to peek at the next entry of the queue and see if we 
should stop
+              Pair<StreamingOffset, FileScanTask> nextElem = queue.peekFirst();
+              boolean endOffsetPeek = false;
+              if (nextElem != null) {
+                endOffsetPeek = endOffset.equals(nextElem.first());
+              }
+              // end offset may be synthetic and not exist in the queue
+              boolean endOffsetSynthetic =
+                  currentOffset.snapshotId() == endOffset.snapshotId()
+                      && (currentOffset.position() + 1) == 
endOffset.position();
+              shouldTerminate = endOffsetPeek || endOffsetSynthetic;
+            } else {
+              LOG.trace("planFiles hasn't reached {}, waiting", endOffset);
+            }
+          } while (!shouldTerminate
+              && refreshFailedThrowable == null
+              && fillQueueFailedThrowable == null);
+
+          if (refreshFailedThrowable != null) {
+            throw new RuntimeException("Table refresh failed", 
refreshFailedThrowable);
+          }
+
+          if (fillQueueFailedThrowable != null) {
+            throw new RuntimeException("Queue filling failed", 
fillQueueFailedThrowable);
+          }
+
+          LOG.info(
+              "completed planFiles for {}, startOffset: {}, endOffset: {}, 
files: {}, rows: {}",
+              table().name(),
+              startOffset,
+              endOffset,
+              filesInPlan,
+              rowsInPlan);
+          return result;
+        });
+  }
+
+  /**
+   * This needs to be non destructive on the queue as spark could call this 
multiple times. Each
+   * time, depending on the table state it could return something different
+   *
+   * @param startOffset the starting offset of the next microbatch
+   * @param limit a limit for how many files/bytes/rows the next microbatch 
should include
+   * @return The end offset to use for the next microbatch, null signals that 
no data is available
+   */
+  @Override
+  public synchronized StreamingOffset latestOffset(StreamingOffset 
startOffset, ReadLimit limit) {
+    LOG.info(
+        "running latestOffset for {}, startOffset: {}, limit: {}",
+        table().name(),
+        startOffset,
+        limit);
+
+    if (table().currentSnapshot() == null) {
+      LOG.info("latestOffset returning START_OFFSET, currentSnapshot() is 
null");
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (table().currentSnapshot().timestampMillis() < 
readConf().streamFromTimestamp()) {
+      LOG.info("latestOffset returning START_OFFSET, currentSnapshot() < 
fromTimestamp");
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // if any exceptions were encountered in the background process, raise 
them here
+    if (refreshFailedThrowable != null) {
+      throw new RuntimeException(refreshFailedThrowable);
+    }
+    if (fillQueueFailedThrowable != null) {
+      throw new RuntimeException(fillQueueFailedThrowable);
+    }
+
+    // if we want to read all available we don't need to scan files, just 
snapshots
+    if (limit instanceof ReadAllAvailable) {
+      // If Trigger.AvailableNow cap is set, return it directly
+      if (this.lastOffsetForTriggerAvailableNow != null) {
+        return this.lastOffsetForTriggerAvailableNow;
+      }
+      Snapshot lastValidSnapshot = table().snapshot(startOffset.snapshotId());
+      Snapshot nextValidSnapshot;
+      do {
+        nextValidSnapshot = nextValidSnapshot(lastValidSnapshot);
+        if (nextValidSnapshot != null) {
+          lastValidSnapshot = nextValidSnapshot;
+        }
+      } while (nextValidSnapshot != null);
+      return new StreamingOffset(
+          lastValidSnapshot.snapshotId(),
+          MicroBatchUtils.addedFilesCount(table(), lastValidSnapshot),
+          false);
+    }
+
+    return computeLimitedOffset(limit);
+  }
+
+  private StreamingOffset computeLimitedOffset(ReadLimit limit) {
+    UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+    long rowsSeen = 0;
+    long filesSeen = 0;
+    LOG.debug(
+        "latestOffset queue status, queuedFiles: {}, queuedRows: {}",
+        queuedFileCount.get(),
+        queuedRowCount.get());
+
+    List<Pair<StreamingOffset, FileScanTask>> queueSnapshot = 
Lists.newArrayList(queue);
+    Pair<StreamingOffset, FileScanTask> queueTail =
+        queueSnapshot.isEmpty() ? null : 
queueSnapshot.get(queueSnapshot.size() - 1);
+
+    for (int i = 0; i < queueSnapshot.size(); i++) {
+      Pair<StreamingOffset, FileScanTask> elem = queueSnapshot.get(i);
+      long fileRows = elem.second().file().recordCount();
+
+      // Hard limit on files - stop BEFORE exceeding
+      if (filesSeen + 1 > unpackedLimits.getMaxFiles()) {
+        if (filesSeen == 0) {
+          return null;
+        }
+        LOG.debug(
+            "latestOffset hit file limit at {}, rows: {}, files: {}",
+            elem.first(),
+            rowsSeen,
+            filesSeen);
+        return elem.first();
+      }
+
+      // Soft limit on rows - include file FIRST, then check
+      rowsSeen += fileRows;
+      filesSeen += 1;
+
+      // Check if we've hit the row limit after including this file
+      if (rowsSeen >= unpackedLimits.getMaxRows()) {
+        if (filesSeen == 1 && rowsSeen > unpackedLimits.getMaxRows()) {
+          LOG.warn(
+              "File {} at offset {} contains {} records, exceeding 
maxRecordsPerMicroBatch limit of {}. "
+                  + "This file will be processed entirely to guarantee forward 
progress. "
+                  + "Consider increasing the limit or writing smaller files to 
avoid unexpected memory usage.",
+              elem.second().file().location(),
+              elem.first(),
+              fileRows,
+              unpackedLimits.getMaxRows());
+        }
+        // Return the offset of the NEXT element (or synthesize tail+1)
+        if (i + 1 < queueSnapshot.size()) {
+          LOG.debug(
+              "latestOffset hit row limit at {}, rows: {}, files: {}",
+              queueSnapshot.get(i + 1).first(),
+              rowsSeen,
+              filesSeen);
+          return queueSnapshot.get(i + 1).first();
+        } else {
+          // This is the last element - return tail+1
+          StreamingOffset current = elem.first();
+          StreamingOffset result =
+              new StreamingOffset(
+                  current.snapshotId(), current.position() + 1, 
current.shouldScanAllFiles());
+          LOG.debug(
+              "latestOffset hit row limit at tail {}, rows: {}, files: {}",
+              result,
+              rowsSeen,
+              filesSeen);
+          return result;
+        }
+      }
+    }
+
+    // if we got here there aren't enough files to exceed our limits
+    if (queueTail != null) {
+      StreamingOffset tailOffset = queueTail.first();
+      // we have to increment the position by 1 since we want to include the 
tail in the read and
+      // position is non-inclusive
+      StreamingOffset latestOffset =
+          new StreamingOffset(
+              tailOffset.snapshotId(), tailOffset.position() + 1, 
tailOffset.shouldScanAllFiles());
+      LOG.debug("latestOffset returning all queued data {}", latestOffset);
+      return latestOffset;
+    }
+
+    // if we got here the queue is empty
+    LOG.debug("latestOffset no data, returning null");
+    return null;
+  }
+
+  // Background task wrapper that traps exceptions
+  private void refreshAndTrapException() {
+    try {
+      table().refresh();
+    } catch (Throwable t) {
+      LOG.error("Failed to refresh table {}", table().name(), t);
+      refreshFailedThrowable = t;
+    }
+  }
+
+  // Background task wrapper that traps exceptions
+  private void fillQueueAndTrapException(Snapshot snapshot) {
+    try {
+      fillQueue(snapshot);
+    } catch (Throwable t) {
+      LOG.error("Failed to fill queue for table {}", table().name(), t);
+      fillQueueFailedThrowable = t;
+    }
+  }
+
+  /** Generate a MicroBatch based on input parameters and add to the queue */
+  private void addMicroBatchToQueue(
+      Snapshot snapshot, long startFileIndex, long endFileIndex, boolean 
shouldScanAllFile) {
+    LOG.info("Adding MicroBatch for snapshot: {} to the queue", 
snapshot.snapshotId());
+    MicroBatches.MicroBatch microBatch =
+        MicroBatches.from(snapshot, table().io())
+            .caseSensitive(readConf().caseSensitive())
+            .specsById(table().specs())
+            .generate(startFileIndex, endFileIndex, Long.MAX_VALUE, 
shouldScanAllFile);
+
+    long position = startFileIndex;
+    for (FileScanTask task : microBatch.tasks()) {
+      Pair<StreamingOffset, FileScanTask> elem =
+          Pair.of(new StreamingOffset(microBatch.snapshotId(), position, 
shouldScanAllFile), task);
+      queuedFileCount.incrementAndGet();
+      queuedRowCount.addAndGet(task.file().recordCount());
+      queue.addLast(elem);
+      position += 1;
+    }
+    if (LOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("\n");
+      for (Pair<StreamingOffset, FileScanTask> elem : queue) {
+        sb.append(elem.first()).append("\n");
+      }
+      LOG.debug(sb.toString());
+    }
+    lastQueuedSnapshot = snapshot;
+  }
+
+  private void fillQueue(StreamingOffset fromOffset, StreamingOffset toOffset) 
{
+    LOG.debug("filling queue from {}, to: {}", fromOffset, toOffset);
+    Snapshot currentSnapshot = table().snapshot(fromOffset.snapshotId());
+    // this could be a partial snapshot so add it outside the loop
+    if (currentSnapshot != null) {
+      addMicroBatchToQueue(
+          currentSnapshot,
+          fromOffset.position(),
+          MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+          fromOffset.shouldScanAllFiles());
+    }
+    if (toOffset != null) {
+      if (currentSnapshot != null) {
+        while (currentSnapshot.snapshotId() != toOffset.snapshotId()) {
+          currentSnapshot = nextValidSnapshot(currentSnapshot);
+          if (currentSnapshot != null) {
+            addMicroBatchToQueue(
+                currentSnapshot,
+                0,
+                MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+                false);
+          } else {
+            break;
+          }
+        }
+      }
+      // toOffset snapshot already added in loop when currentSnapshot == 
toOffset
+    } else {
+      fillQueueInitialBuffer(currentSnapshot);
+    }
+  }
+
+  private void fillQueueInitialBuffer(Snapshot startSnapshot) {
+    // toOffset is null - fill initial buffer to prevent queue starvation 
before background
+    // thread starts. Use configured limits to avoid loading all snapshots
+    // (which could cause OOM on tables with thousands of snapshots).
+    long targetRows = readConf().asyncQueuePreloadRowLimit();
+    long targetFiles = readConf().asyncQueuePreloadFileLimit();
+
+    Snapshot preloadEndSnapshot = initialPreloadEndSnapshot();
+    if (preloadEndSnapshot == null) {
+      return; // Empty table
+    }
+
+    // START_OFFSET case: initialize using nextValidSnapshot which respects 
timestamp filtering
+    Snapshot current = startSnapshot;
+    if (current == null) {
+      current = nextValidSnapshot(null);
+      if (current != null) {
+        addMicroBatchToQueue(current, 0, 
MicroBatchUtils.addedFilesCount(table(), current), false);
+      }
+    }
+
+    // Continue loading more snapshots within safety limits
+    if (current != null) {
+      while ((queuedRowCount.get() < targetRows || queuedFileCount.get() < 
targetFiles)
+          && current.snapshotId() != preloadEndSnapshot.snapshotId()) {
+        current = nextValidSnapshot(current);
+        if (current != null) {
+          addMicroBatchToQueue(
+              current, 0, MicroBatchUtils.addedFilesCount(table(), current), 
false);
+        } else {
+          break;
+        }
+      }
+    }
+  }
+
+  private Snapshot initialPreloadEndSnapshot() {
+    if (lastOffsetForTriggerAvailableNow != null) {
+      return table().snapshot(lastOffsetForTriggerAvailableNow.snapshotId());
+    }
+
+    return table().currentSnapshot();
+  }
+
+  @VisibleForTesting
+  static boolean reachedAvailableNowCap(
+      Snapshot readFrom, StreamingOffset lastOffsetForTriggerAvailableNow) {
+    return lastOffsetForTriggerAvailableNow != null
+        && readFrom != null
+        && readFrom.snapshotId() == 
lastOffsetForTriggerAvailableNow.snapshotId();
+  }
+
+  /** Try to populate the queue with data from unread snapshots */
+  private void fillQueue(Snapshot readFrom) {
+    // Don't add beyond cap for Trigger.AvailableNow
+    if (reachedAvailableNowCap(readFrom, lastOffsetForTriggerAvailableNow)) {
+      LOG.debug(
+          "Reached cap snapshot {}, not adding more",
+          this.lastOffsetForTriggerAvailableNow.snapshotId());
+      return;
+    }
+
+    if ((queuedRowCount.get() > minQueuedRows) || (queuedFileCount.get() > 
minQueuedFiles)) {
+      // we have enough data buffered, check back shortly
+      LOG.debug(
+          "Buffer is full, {} > {} or {} > {}",
+          queuedRowCount.get(),
+          minQueuedRows,
+          queuedFileCount.get(),
+          minQueuedFiles);
+    } else {
+      // add an entire snapshot to the queue
+      Snapshot nextValidSnapshot = nextValidSnapshot(readFrom);
+      if (nextValidSnapshot != null) {
+        addMicroBatchToQueue(
+            nextValidSnapshot,
+            0,
+            MicroBatchUtils.addedFilesCount(table(), nextValidSnapshot),
+            false);
+      } else {
+        LOG.debug("No snapshots ready to be read");
+      }
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..9298c2bbdf
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Locale;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class);
+  private final Table table;
+  private final SparkReadConf readConf;
+
+  BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) {
+    this.table = table;
+    this.readConf = readConf;
+  }
+
+  protected Table table() {
+    return table;
+  }
+
+  protected SparkReadConf readConf() {
+    return readConf;
+  }
+
+  protected boolean shouldProcess(Snapshot snapshot) {
+    String op = snapshot.operation();
+    switch (op) {
+      case DataOperations.APPEND:
+        return true;
+      case DataOperations.REPLACE:
+        return false;
+      case DataOperations.DELETE:
+        Preconditions.checkState(
+            readConf.streamingSkipDeleteSnapshots(),
+            "Cannot process delete snapshot: %s, to ignore deletes, set 
%s=true",
+            snapshot.snapshotId(),
+            SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
+        return false;
+      case DataOperations.OVERWRITE:
+        Preconditions.checkState(
+            readConf.streamingSkipOverwriteSnapshots(),
+            "Cannot process overwrite snapshot: %s, to ignore overwrites, set 
%s=true",
+            snapshot.snapshotId(),
+            SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
+        return false;
+      default:
+        throw new IllegalStateException(
+            String.format(
+                "Cannot process unknown snapshot operation: %s (snapshot id 
%s)",
+                op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
+    }
+  }
+
+  /**
+   * Get the next snapshot skipping over rewrite and delete snapshots. Async 
must handle nulls.
+   *
+   * @param curSnapshot the current snapshot
+   * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
+   *     remaining snapshots should be skipped.
+   */
+  protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+    Snapshot nextSnapshot;
+    // if there were no valid snapshots, check for an initialOffset again
+    if (curSnapshot == null) {
+      StreamingOffset startingOffset =
+          MicroBatchUtils.determineStartingOffset(table, 
readConf.streamFromTimestamp());
+      LOG.debug("determineStartingOffset picked startingOffset: {}", 
startingOffset);
+      if (StreamingOffset.START_OFFSET.equals(startingOffset)) {
+        return null;
+      }
+      nextSnapshot = table.snapshot(startingOffset.snapshotId());
+    } else {
+      if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+        return null;
+      }
+      nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
+    }
+    // skip over rewrite and delete snapshots
+    while (!shouldProcess(nextSnapshot)) {
+      LOG.debug("Skipping snapshot: {}", nextSnapshot);
+      // if the currentSnapShot was also the mostRecentSnapshot then break
+      // avoids snapshotAfter throwing exception since there are no more 
snapshots to process
+      if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+        return null;
+      }
+      nextSnapshot = SnapshotUtil.snapshotAfter(table, 
nextSnapshot.snapshotId());
+    }
+    return nextSnapshot;
+  }
+
+  static class UnpackedLimits {
+    private long maxRows = Integer.MAX_VALUE;
+    private long maxFiles = Integer.MAX_VALUE;
+
+    UnpackedLimits(ReadLimit limit) {
+      if (limit instanceof CompositeReadLimit) {
+        ReadLimit[] compositeLimits = ((CompositeReadLimit) 
limit).getReadLimits();
+        for (ReadLimit individualLimit : compositeLimits) {
+          if (individualLimit instanceof ReadMaxRows) {
+            ReadMaxRows readMaxRows = (ReadMaxRows) individualLimit;
+            this.maxRows = Math.min(this.maxRows, readMaxRows.maxRows());
+          } else if (individualLimit instanceof ReadMaxFiles) {
+            ReadMaxFiles readMaxFiles = (ReadMaxFiles) individualLimit;
+            this.maxFiles = Math.min(this.maxFiles, readMaxFiles.maxFiles());
+          }
+        }
+      } else if (limit instanceof ReadMaxRows) {
+        this.maxRows = ((ReadMaxRows) limit).maxRows();
+      } else if (limit instanceof ReadMaxFiles) {
+        this.maxFiles = ((ReadMaxFiles) limit).maxFiles();
+      }
+    }
+
+    public long getMaxRows() {
+      return maxRows;
+    }
+
+    public long getMaxFiles() {
+      return maxFiles;
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
new file mode 100644
index 0000000000..7c73e3f416
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+
+class MicroBatchUtils {
+
+  private MicroBatchUtils() {}
+
+  static StreamingOffset determineStartingOffset(Table table, long 
fromTimestamp) {
+    if (table.currentSnapshot() == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (fromTimestamp == Long.MIN_VALUE) {
+      // start from the oldest snapshot, since default value is MIN_VALUE
+      // avoids looping to find first snapshot
+      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+    }
+
+    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    try {
+      Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, 
fromTimestamp);
+      if (snapshot != null) {
+        return new StreamingOffset(snapshot.snapshotId(), 0, false);
+      } else {
+        return StreamingOffset.START_OFFSET;
+      }
+    } catch (IllegalStateException e) {
+      // could not determine the first snapshot after the timestamp. use the 
oldest ancestor instead
+      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+    }
+  }
+
+  static long addedFilesCount(Table table, Snapshot snapshot) {
+    long addedFilesCount =
+        PropertyUtil.propertyAsLong(snapshot.summary(), 
SnapshotSummary.ADDED_FILES_PROP, -1);
+    return addedFilesCount == -1
+        ? Iterables.size(
+            
SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDataFiles())
+        : addedFilesCount;
+  }
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..1986ddac5d
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileScanTask;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+
+interface SparkMicroBatchPlanner {
+  /**
+   * Return the {@link FileScanTask}s for data added between the start and end 
offsets.
+   *
+   * @param startOffset the offset to start planning from
+   * @param endOffset the offset to plan up to
+   * @return file scan tasks for data in the offset range
+   */
+  List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset 
endOffset);
+
+  /**
+   * Return the latest offset the stream can advance to from {@code 
startOffset}, respecting the
+   * given {@link ReadLimit}.
+   *
+   * @param startOffset the current offset of the stream
+   * @param limit the read limit bounding how far ahead to advance
+   * @return the latest available offset, or {@code null} if no new data is 
available
+   */
+  StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit);
+
+  /** Stop the planner and release any resources. */
+  void stop();
+}
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 06bc16bba8..a1ff767fe2 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -26,48 +26,32 @@ import java.io.OutputStreamWriter;
 import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.Locale;
 import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.MicroBatches;
-import org.apache.iceberg.MicroBatches.MicroBatch;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotChanges;
-import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.FileIO;
 import org.apache.iceberg.io.InputFile;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.base.Joiner;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.SnapshotUtil;
 import org.apache.iceberg.util.TableScanUtil;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.broadcast.Broadcast;
 import org.apache.spark.sql.connector.read.InputPartition;
 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
-import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
 import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
 import org.apache.spark.sql.connector.read.streaming.Offset;
 import org.apache.spark.sql.connector.read.streaming.ReadLimit;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
 import 
org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,6 +63,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
 
   private final Table table;
   private final Supplier<FileIO> fileIO;
+  private final SparkReadConf readConf;
   private final String branch;
   private final boolean caseSensitive;
   private final String expectedSchema;
@@ -89,12 +74,11 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   private final long splitOpenFileCost;
   private final boolean localityPreferred;
   private final StreamingOffset initialOffset;
-  private final boolean skipDelete;
-  private final boolean skipOverwrite;
   private final long fromTimestamp;
   private final int maxFilesPerMicroBatch;
   private final int maxRecordsPerMicroBatch;
   private final boolean cacheDeleteFilesOnExecutors;
+  private SparkMicroBatchPlanner planner;
   private StreamingOffset lastOffsetForTriggerAvailableNow;
 
   SparkMicroBatchStream(
@@ -106,6 +90,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
       String checkpointLocation) {
     this.table = table;
     this.fileIO = fileIO;
+    this.readConf = readConf;
     this.branch = readConf.branch();
     this.caseSensitive = readConf.caseSensitive();
     this.expectedSchema = SchemaParser.toJson(expectedSchema);
@@ -124,9 +109,6 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
         new InitialOffsetStore(
             table, checkpointLocation, fromTimestamp, 
sparkContext.hadoopConfiguration());
     this.initialOffset = initialOffsetStore.initialOffset();
-
-    this.skipDelete = readConf.streamingSkipDeleteSnapshots();
-    this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
   }
 
   @Override
@@ -141,8 +123,8 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
     }
 
     Snapshot latestSnapshot = table.currentSnapshot();
-
-    return new StreamingOffset(latestSnapshot.snapshotId(), 
addedFilesCount(latestSnapshot), false);
+    return new StreamingOffset(
+        latestSnapshot.snapshotId(), MicroBatchUtils.addedFilesCount(table, 
latestSnapshot), false);
   }
 
   @Override
@@ -161,7 +143,11 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
     StreamingOffset endOffset = (StreamingOffset) end;
     StreamingOffset startOffset = (StreamingOffset) start;
 
-    List<FileScanTask> fileScanTasks = planFiles(startOffset, endOffset);
+    if (planner == null) {
+      initializePlanner(startOffset, endOffset);
+    }
+
+    List<FileScanTask> fileScanTasks = planner.planFiles(startOffset, 
endOffset);
 
     CloseableIterable<FileScanTask> splitTasks =
         
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks), 
splitSize);
@@ -171,7 +157,6 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
     String[][] locations = computePreferredLocations(combinedScanTasks);
 
     InputPartition[] partitions = new InputPartition[combinedScanTasks.size()];
-
     for (int index = 0; index < combinedScanTasks.size(); index++) {
       partitions[index] =
           new SparkInputPartition(
@@ -214,318 +199,35 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   public void commit(Offset end) {}
 
   @Override
-  public void stop() {}
-
-  private List<FileScanTask> planFiles(StreamingOffset startOffset, 
StreamingOffset endOffset) {
-    List<FileScanTask> fileScanTasks = Lists.newArrayList();
-    StreamingOffset batchStartOffset =
-        StreamingOffset.START_OFFSET.equals(startOffset)
-            ? determineStartingOffset(table, fromTimestamp)
-            : startOffset;
-
-    StreamingOffset currentOffset = null;
-
-    // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
-    do {
-      long endFileIndex;
-      if (currentOffset == null) {
-        currentOffset = batchStartOffset;
-      } else {
-        Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table, 
currentOffset.snapshotId());
-        // it may happen that we need to read this snapshot partially in case 
it's equal to
-        // endOffset.
-        if (currentOffset.snapshotId() != endOffset.snapshotId()) {
-          currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, 
false);
-        } else {
-          currentOffset = endOffset;
-        }
-      }
-
-      Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
-
-      validateCurrentSnapshotExists(snapshot, currentOffset);
-
-      if (!shouldProcess(snapshot)) {
-        LOG.debug("Skipping snapshot: {} of table {}", 
currentOffset.snapshotId(), table.name());
-        continue;
-      }
-
-      Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
-      if (currentOffset.snapshotId() == endOffset.snapshotId()) {
-        endFileIndex = endOffset.position();
-      } else {
-        endFileIndex = addedFilesCount(currentSnapshot);
-      }
-
-      MicroBatch latestMicroBatch =
-          MicroBatches.from(currentSnapshot, table.io())
-              .caseSensitive(caseSensitive)
-              .specsById(table.specs())
-              .generate(
-                  currentOffset.position(),
-                  endFileIndex,
-                  Long.MAX_VALUE,
-                  currentOffset.shouldScanAllFiles());
-
-      fileScanTasks.addAll(latestMicroBatch.tasks());
-    } while (currentOffset.snapshotId() != endOffset.snapshotId());
-
-    return fileScanTasks;
-  }
-
-  private boolean shouldProcess(Snapshot snapshot) {
-    String op = snapshot.operation();
-    switch (op) {
-      case DataOperations.APPEND:
-        return true;
-      case DataOperations.REPLACE:
-        return false;
-      case DataOperations.DELETE:
-        Preconditions.checkState(
-            skipDelete,
-            "Cannot process delete snapshot: %s, to ignore deletes, set 
%s=true",
-            snapshot.snapshotId(),
-            SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
-        return false;
-      case DataOperations.OVERWRITE:
-        Preconditions.checkState(
-            skipOverwrite,
-            "Cannot process overwrite snapshot: %s, to ignore overwrites, set 
%s=true",
-            snapshot.snapshotId(),
-            SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
-        return false;
-      default:
-        throw new IllegalStateException(
-            String.format(
-                "Cannot process unknown snapshot operation: %s (snapshot id 
%s)",
-                op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
-    }
-  }
-
-  private static StreamingOffset determineStartingOffset(Table table, Long 
fromTimestamp) {
-    if (table.currentSnapshot() == null) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    if (fromTimestamp == null) {
-      // match existing behavior and start from the oldest snapshot
-      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
-    }
-
-    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    try {
-      Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table, 
fromTimestamp);
-      if (snapshot != null) {
-        return new StreamingOffset(snapshot.snapshotId(), 0, false);
-      } else {
-        return StreamingOffset.START_OFFSET;
-      }
-    } catch (IllegalStateException e) {
-      // could not determine the first snapshot after the timestamp. use the 
oldest ancestor instead
-      return new 
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+  public void stop() {
+    if (planner != null) {
+      planner.stop();
     }
   }
 
-  private static int getMaxFiles(ReadLimit readLimit) {
-    if (readLimit instanceof ReadMaxFiles) {
-      return ((ReadMaxFiles) readLimit).maxFiles();
-    }
-
-    if (readLimit instanceof CompositeReadLimit) {
-      // We do not expect a CompositeReadLimit to contain a nested 
CompositeReadLimit.
-      // In fact, it should only be a composite of two or more of ReadMinRows, 
ReadMaxRows and
-      // ReadMaxFiles, with no more than one of each.
-      ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
-      for (ReadLimit limit : limits) {
-        if (limit instanceof ReadMaxFiles) {
-          return ((ReadMaxFiles) limit).maxFiles();
-        }
-      }
-    }
-
-    // there is no ReadMaxFiles, so return the default
-    return Integer.MAX_VALUE;
-  }
-
-  private static int getMaxRows(ReadLimit readLimit) {
-    if (readLimit instanceof ReadMaxRows) {
-      long maxRows = ((ReadMaxRows) readLimit).maxRows();
-      return Math.toIntExact(maxRows);
-    }
-
-    if (readLimit instanceof CompositeReadLimit) {
-      ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
-      for (ReadLimit limit : limits) {
-        if (limit instanceof ReadMaxRows) {
-          long maxRows = ((ReadMaxRows) limit).maxRows();
-          return Math.toIntExact(maxRows);
-        }
-      }
+  private void initializePlanner(StreamingOffset startOffset, StreamingOffset 
endOffset) {
+    if (readConf.asyncMicroBatchPlanningEnabled()) {
+      this.planner =
+          new AsyncSparkMicroBatchPlanner(
+              table, readConf, startOffset, endOffset, 
lastOffsetForTriggerAvailableNow);
+    } else {
+      this.planner =
+          new SyncSparkMicroBatchPlanner(table, readConf, 
lastOffsetForTriggerAvailableNow);
     }
-
-    // There is no ReadMaxRows, so return the default
-    return Integer.MAX_VALUE;
   }
 
   @Override
-  @SuppressWarnings("checkstyle:CyclomaticComplexity")
   public Offset latestOffset(Offset startOffset, ReadLimit limit) {
-    // calculate end offset get snapshotId from the startOffset
     Preconditions.checkArgument(
         startOffset instanceof StreamingOffset,
         "Invalid start offset: %s is not a StreamingOffset",
         startOffset);
 
-    table.refresh();
-    if (table.currentSnapshot() == null) {
-      return StreamingOffset.START_OFFSET;
-    }
-
-    if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
-      return StreamingOffset.START_OFFSET;
+    if (planner == null) {
+      initializePlanner((StreamingOffset) startOffset, null);
     }
 
-    // end offset can expand to multiple snapshots
-    StreamingOffset startingOffset = (StreamingOffset) startOffset;
-
-    if (startOffset.equals(StreamingOffset.START_OFFSET)) {
-      startingOffset = determineStartingOffset(table, fromTimestamp);
-    }
-
-    Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
-    validateCurrentSnapshotExists(curSnapshot, startingOffset);
-
-    // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
-    long latestSnapshotId =
-        lastOffsetForTriggerAvailableNow != null
-            ? lastOffsetForTriggerAvailableNow.snapshotId()
-            : table.currentSnapshot().snapshotId();
-
-    int startPosOfSnapOffset = (int) startingOffset.position();
-
-    boolean scanAllFiles = startingOffset.shouldScanAllFiles();
-
-    boolean shouldContinueReading = true;
-    int curFilesAdded = 0;
-    long curRecordCount = 0;
-    long curPos = 0;
-
-    // Note : we produce nextOffset with pos as non-inclusive
-    while (shouldContinueReading) {
-      // generate manifest index for the curSnapshot
-      List<Pair<ManifestFile, Integer>> indexedManifests =
-          MicroBatches.skippedManifestIndexesFromSnapshot(
-              table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
-      // this is under assumption we will be able to add at-least 1 file in 
the new offset
-      for (int idx = 0; idx < indexedManifests.size() && 
shouldContinueReading; idx++) {
-        // be rest assured curPos >= startFileIndex
-        curPos = indexedManifests.get(idx).second();
-        try (CloseableIterable<FileScanTask> taskIterable =
-                MicroBatches.openManifestFile(
-                    table.io(),
-                    table.specs(),
-                    caseSensitive,
-                    curSnapshot,
-                    indexedManifests.get(idx).first(),
-                    scanAllFiles);
-            CloseableIterator<FileScanTask> taskIter = 
taskIterable.iterator()) {
-          while (taskIter.hasNext()) {
-            FileScanTask task = taskIter.next();
-            if (curPos >= startPosOfSnapOffset) {
-              if ((curFilesAdded + 1) > getMaxFiles(limit)) {
-                // On including the file it might happen that we might exceed, 
the configured
-                // soft limit on the number of records, since this is a soft 
limit its acceptable.
-                shouldContinueReading = false;
-                break;
-              }
-
-              curFilesAdded += 1;
-              curRecordCount += task.file().recordCount();
-
-              if (curRecordCount >= getMaxRows(limit)) {
-                // we included the file, so increment the number of files
-                // read in the current snapshot.
-                ++curPos;
-                shouldContinueReading = false;
-                break;
-              }
-            }
-            ++curPos;
-          }
-        } catch (IOException ioe) {
-          LOG.warn("Failed to close task iterable", ioe);
-        }
-      }
-      // if the currentSnapShot was also the latestSnapshot then break
-      if (curSnapshot.snapshotId() == latestSnapshotId) {
-        break;
-      }
-
-      // if everything was OK and we consumed complete snapshot then move to 
next snapshot
-      if (shouldContinueReading) {
-        Snapshot nextValid = nextValidSnapshot(curSnapshot);
-        if (nextValid == null) {
-          // nextValid implies all the remaining snapshots should be skipped.
-          break;
-        }
-        // we found the next available snapshot, continue from there.
-        curSnapshot = nextValid;
-        startPosOfSnapOffset = -1;
-        // if anyhow we are moving to next snapshot we should only scan 
addedFiles
-        scanAllFiles = false;
-      }
-    }
-
-    StreamingOffset latestStreamingOffset =
-        new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
-
-    // if no new data arrived, then return null.
-    return latestStreamingOffset.equals(startingOffset) ? null : 
latestStreamingOffset;
-  }
-
-  /**
-   * Get the next snapshot skiping over rewrite and delete snapshots.
-   *
-   * @param curSnapshot the current snapshot
-   * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
-   *     remaining snapshots should be skipped.
-   */
-  private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
-    Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
-    // skip over rewrite and delete snapshots
-    while (!shouldProcess(nextSnapshot)) {
-      LOG.debug("Skipping snapshot: {} of table {}", 
nextSnapshot.snapshotId(), table.name());
-      // if the currentSnapShot was also the mostRecentSnapshot then break
-      if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
-        return null;
-      }
-      nextSnapshot = SnapshotUtil.snapshotAfter(table, 
nextSnapshot.snapshotId());
-    }
-    return nextSnapshot;
-  }
-
-  private long addedFilesCount(Snapshot snapshot) {
-    long addedFilesCount =
-        PropertyUtil.propertyAsLong(snapshot.summary(), 
SnapshotSummary.ADDED_FILES_PROP, -1);
-    // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
-    // iterate through addedFiles iterator to find addedFilesCount.
-    return addedFilesCount == -1
-        ? Iterables.size(
-            
SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDataFiles())
-        : addedFilesCount;
-  }
-
-  private void validateCurrentSnapshotExists(Snapshot snapshot, 
StreamingOffset currentOffset) {
-    if (snapshot == null) {
-      throw new IllegalStateException(
-          String.format(
-              Locale.ROOT,
-              "Cannot load current offset at snapshot %d, the snapshot was 
expired or removed",
-              currentOffset.snapshotId()));
-    }
+    return planner.latestOffset((StreamingOffset) startOffset, limit);
   }
 
   @Override
@@ -553,6 +255,11 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
         (StreamingOffset) latestOffset(initialOffset, 
ReadLimit.allAvailable());
 
     LOG.info("lastOffset for Trigger.AvailableNow is {}", 
lastOffsetForTriggerAvailableNow.json());
+
+    if (planner != null) {
+      planner.stop();
+      planner = null;
+    }
   }
 
   private static class InitialOffsetStore {
@@ -576,7 +283,7 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
       }
 
       table.refresh();
-      StreamingOffset offset = determineStartingOffset(table, fromTimestamp);
+      StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table, 
fromTimestamp);
 
       OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
       writeOffset(offset, outputFile);
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..f1b0029c54
--- /dev/null
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SyncSparkMicroBatchPlanner.class);
+
+  private final boolean caseSensitive;
+  private final long fromTimestamp;
+  private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+  SyncSparkMicroBatchPlanner(
+      Table table, SparkReadConf readConf, StreamingOffset 
lastOffsetForTriggerAvailableNow) {
+    super(table, readConf);
+    this.caseSensitive = readConf().caseSensitive();
+    this.fromTimestamp = readConf().streamFromTimestamp();
+    this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+  }
+
+  @Override
+  public List<FileScanTask> planFiles(StreamingOffset startOffset, 
StreamingOffset endOffset) {
+    List<FileScanTask> fileScanTasks = Lists.newArrayList();
+    StreamingOffset batchStartOffset =
+        StreamingOffset.START_OFFSET.equals(startOffset)
+            ? MicroBatchUtils.determineStartingOffset(table(), fromTimestamp)
+            : startOffset;
+
+    StreamingOffset currentOffset = null;
+
+    // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
+    do {
+      long endFileIndex;
+      if (currentOffset == null) {
+        currentOffset = batchStartOffset;
+      } else {
+        Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table(), 
currentOffset.snapshotId());
+        // it may happen that we need to read this snapshot partially in case 
it's equal to
+        // endOffset.
+        if (currentOffset.snapshotId() != endOffset.snapshotId()) {
+          currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L, 
false);
+        } else {
+          currentOffset = endOffset;
+        }
+      }
+
+      Snapshot snapshot = table().snapshot(currentOffset.snapshotId());
+
+      validateCurrentSnapshotExists(snapshot, currentOffset);
+
+      if (!shouldProcess(snapshot)) {
+        LOG.debug("Skipping snapshot: {} of table {}", 
currentOffset.snapshotId(), table().name());
+        continue;
+      }
+
+      Snapshot currentSnapshot = table().snapshot(currentOffset.snapshotId());
+      if (currentOffset.snapshotId() == endOffset.snapshotId()) {
+        endFileIndex = endOffset.position();
+      } else {
+        endFileIndex = MicroBatchUtils.addedFilesCount(table(), 
currentSnapshot);
+      }
+
+      MicroBatch latestMicroBatch =
+          MicroBatches.from(currentSnapshot, table().io())
+              .caseSensitive(caseSensitive)
+              .specsById(table().specs())
+              .generate(
+                  currentOffset.position(),
+                  endFileIndex,
+                  Long.MAX_VALUE,
+                  currentOffset.shouldScanAllFiles());
+
+      fileScanTasks.addAll(latestMicroBatch.tasks());
+    } while (currentOffset.snapshotId() != endOffset.snapshotId());
+
+    return fileScanTasks;
+  }
+
+  @Override
+  @SuppressWarnings("checkstyle:CyclomaticComplexity")
+  public StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit 
limit) {
+    table().refresh();
+    if (table().currentSnapshot() == null) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (table().currentSnapshot().timestampMillis() < fromTimestamp) {
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // end offset can expand to multiple snapshots
+    StreamingOffset startingOffset = startOffset;
+
+    if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+      startingOffset = MicroBatchUtils.determineStartingOffset(table(), 
fromTimestamp);
+    }
+
+    Snapshot curSnapshot = table().snapshot(startingOffset.snapshotId());
+    validateCurrentSnapshotExists(curSnapshot, startingOffset);
+
+    // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
+    long latestSnapshotId =
+        lastOffsetForTriggerAvailableNow != null
+            ? lastOffsetForTriggerAvailableNow.snapshotId()
+            : table().currentSnapshot().snapshotId();
+
+    int startPosOfSnapOffset = (int) startingOffset.position();
+
+    boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+    boolean shouldContinueReading = true;
+    int curFilesAdded = 0;
+    long curRecordCount = 0;
+    int curPos = 0;
+
+    // Extract limits once to avoid repeated calls in tight loop
+    UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+    long maxFiles = unpackedLimits.getMaxFiles();
+    long maxRows = unpackedLimits.getMaxRows();
+
+    // Note : we produce nextOffset with pos as non-inclusive
+    while (shouldContinueReading) {
+      // generate manifest index for the curSnapshot
+      List<Pair<ManifestFile, Integer>> indexedManifests =
+          MicroBatches.skippedManifestIndexesFromSnapshot(
+              table().io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
+      // this is under assumption we will be able to add at-least 1 file in 
the new offset
+      for (int idx = 0; idx < indexedManifests.size() && 
shouldContinueReading; idx++) {
+        // be rest assured curPos >= startFileIndex
+        curPos = indexedManifests.get(idx).second();
+        try (CloseableIterable<FileScanTask> taskIterable =
+                MicroBatches.openManifestFile(
+                    table().io(),
+                    table().specs(),
+                    caseSensitive,
+                    curSnapshot,
+                    indexedManifests.get(idx).first(),
+                    scanAllFiles);
+            CloseableIterator<FileScanTask> taskIter = 
taskIterable.iterator()) {
+          while (taskIter.hasNext()) {
+            FileScanTask task = taskIter.next();
+            if (curPos >= startPosOfSnapOffset) {
+              if ((curFilesAdded + 1) > maxFiles) {
+                // On including the file it might happen that we might exceed, 
the configured
+                // soft limit on the number of records, since this is a soft 
limit its acceptable.
+                shouldContinueReading = false;
+                break;
+              }
+
+              curFilesAdded += 1;
+              curRecordCount += task.file().recordCount();
+
+              if (curRecordCount >= maxRows) {
+                // we included the file, so increment the number of files
+                // read in the current snapshot.
+                if (curFilesAdded == 1 && curRecordCount > maxRows) {
+                  LOG.warn(
+                      "File {} contains {} records, exceeding 
maxRecordsPerMicroBatch limit of {}. "
+                          + "This file will be processed entirely to guarantee 
forward progress. "
+                          + "Consider increasing the limit or writing smaller 
files to avoid unexpected memory usage.",
+                      task.file().location(),
+                      task.file().recordCount(),
+                      maxRows);
+                }
+                ++curPos;
+                shouldContinueReading = false;
+                break;
+              }
+            }
+            ++curPos;
+          }
+        } catch (IOException ioe) {
+          LOG.warn("Failed to close task iterable", ioe);
+        }
+      }
+      // if the currentSnapShot was also the latestSnapshot then break
+      if (curSnapshot.snapshotId() == latestSnapshotId) {
+        break;
+      }
+
+      // if everything was OK and we consumed complete snapshot then move to 
next snapshot
+      if (shouldContinueReading) {
+        Snapshot nextValid = nextValidSnapshot(curSnapshot);
+        if (nextValid == null) {
+          // nextValid implies all the remaining snapshots should be skipped.
+          break;
+        }
+        // we found the next available snapshot, continue from there.
+        curSnapshot = nextValid;
+        startPosOfSnapOffset = -1;
+        // if anyhow we are moving to next snapshot we should only scan 
addedFiles
+        scanAllFiles = false;
+      }
+    }
+
+    StreamingOffset latestStreamingOffset =
+        new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
+
+    // if no new data arrived, then return null.
+    return latestStreamingOffset.equals(startingOffset) ? null : 
latestStreamingOffset;
+  }
+
+  @Override
+  public void stop() {}
+
+  private void validateCurrentSnapshotExists(Snapshot snapshot, 
StreamingOffset currentOffset) {
+    if (snapshot == null) {
+      throw new IllegalStateException(
+          String.format(
+              Locale.ROOT,
+              "Cannot load current offset at snapshot %d, the snapshot was 
expired or removed",
+              currentOffset.snapshotId()));
+    }
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..b6017e2001
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.iceberg.Snapshot;
+import org.junit.jupiter.api.Test;
+
+class TestAsyncSparkMicroBatchPlanner {
+
+  @Test
+  void reachedAvailableNowCapReturnsTrueOnlyForExactCapSnapshot() {
+    Snapshot capSnapshot = mockSnapshot(10L);
+    Snapshot laterSnapshotWithHigherId = mockSnapshot(20L);
+    Snapshot laterSnapshotWithLowerId = mockSnapshot(5L);
+    StreamingOffset capOffset = new StreamingOffset(10L, 3L, false);
+
+    assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(capSnapshot, 
capOffset)).isTrue();
+    assertThat(
+            AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(
+                laterSnapshotWithHigherId, capOffset))
+        .isFalse();
+    assertThat(
+            
AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(laterSnapshotWithLowerId, 
capOffset))
+        .isFalse();
+  }
+
+  @Test
+  void reachedAvailableNowCapReturnsFalseWhenCapOrSnapshotIsMissing() {
+    Snapshot readFrom = mockSnapshot(10L);
+    StreamingOffset capOffset = new StreamingOffset(10L, 1L, false);
+
+    assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(readFrom, 
null)).isFalse();
+    assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(null, 
capOffset)).isFalse();
+  }
+
+  private Snapshot mockSnapshot(long snapshotId) {
+    Snapshot snapshot = mock(Snapshot.class);
+    when(snapshot.snapshotId()).thenReturn(snapshotId);
+    return snapshot;
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
new file mode 100644
index 0000000000..a9ce340fd4
--- /dev/null
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMicroBatchPlanningUtils extends CatalogTestBase {
+
+  private Table table;
+
+  @BeforeEach
+  public void setupTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql(
+        "CREATE TABLE %s "
+            + "(id INT, data STRING) "
+            + "USING iceberg "
+            + "PARTITIONED BY (bucket(3, id))",
+        tableName);
+    this.table = validationCatalog.loadTable(tableIdent);
+  }
+
+  @AfterEach
+  public void dropTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @TestTemplate
+  public void testUnpackedLimitsCompositeChoosesMinimum() {
+    ReadLimit[] limits =
+        new ReadLimit[] {
+          ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8), 
ReadLimit.maxFiles(2)
+        };
+
+    ReadLimit composite = ReadLimit.compositeLimit(limits);
+
+    BaseSparkMicroBatchPlanner.UnpackedLimits unpacked =
+        new BaseSparkMicroBatchPlanner.UnpackedLimits(composite);
+
+    assertThat(unpacked.getMaxRows()).isEqualTo(4);
+    assertThat(unpacked.getMaxFiles()).isEqualTo(2);
+  }
+
+  @TestTemplate
+  public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() {
+    sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+    table.refresh();
+    long snapshot1Time = table.currentSnapshot().timestampMillis();
+
+    sql("INSERT INTO %s VALUES (2, 'two')", tableName);
+    table.refresh();
+    long snapshot2Id = table.currentSnapshot().snapshotId();
+
+    StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table, 
snapshot1Time + 1);
+
+    assertThat(offset.snapshotId()).isEqualTo(snapshot2Id);
+    assertThat(offset.position()).isEqualTo(0L);
+    assertThat(offset.shouldScanAllFiles()).isFalse();
+  }
+
+  @TestTemplate
+  public void testAddedFilesCountUsesSummaryWhenPresent() {
+    sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+    table.refresh();
+
+    long expectedAddedFiles =
+        
Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.ADDED_FILES_PROP));
+
+    long actual = MicroBatchUtils.addedFilesCount(table, 
table.currentSnapshot());
+
+    assertThat(actual).isEqualTo(expectedAddedFiles);
+  }
+}
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 2f73d51f04..2b2be95c5c 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -31,13 +31,17 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -50,15 +54,22 @@ import org.apache.iceberg.data.FileHelpers;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadConf;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.streaming.Offset;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.streaming.DataStreamWriter;
 import org.apache.spark.sql.streaming.OutputMode;
@@ -73,10 +84,73 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(ParameterizedTestExtension.class)
 public final class TestStructuredStreamingRead3 extends CatalogTestBase {
 
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, 
async = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties(),
+        false
+      },
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties(),
+        true
+      },
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        SparkCatalogConfig.HADOOP.properties(),
+        false
+      },
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        SparkCatalogConfig.HADOOP.properties(),
+        true
+      },
+      {
+        SparkCatalogConfig.REST.catalogName(),
+        SparkCatalogConfig.REST.implementation(),
+        ImmutableMap.builder()
+            .putAll(SparkCatalogConfig.REST.properties())
+            .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
+            .build(),
+        false
+      },
+      {
+        SparkCatalogConfig.REST.catalogName(),
+        SparkCatalogConfig.REST.implementation(),
+        ImmutableMap.builder()
+            .putAll(SparkCatalogConfig.REST.properties())
+            .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
+            .build(),
+        true
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        false
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        SparkCatalogConfig.SPARK.properties(),
+        true
+      }
+    };
+  }
+
   private Table table;
 
   private final AtomicInteger microBatches = new AtomicInteger();
 
+  @Parameter(index = 3)
+  private Boolean async;
+
   /**
    * test data to be used by multiple writes each write creates a snapshot and 
writes a list of
    * records
@@ -158,7 +232,6 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   @TestTemplate
   public void testReadStreamWithMaxFiles1() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
@@ -172,7 +245,6 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   @TestTemplate
   public void testReadStreamWithMaxFiles2() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"2"),
         List.of(3L, 2L, 2L));
@@ -206,7 +278,6 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   @TestTemplate
   public void testReadStreamWithMaxRows2() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"2"),
         List.of(3L, 2L, 2L));
@@ -227,7 +298,6 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   @TestTemplate
   public void testReadStreamWithMaxRows4() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"), List.of(4L, 3L));
 
@@ -240,13 +310,10 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   @TestTemplate
   public void testReadStreamWithCompositeReadLimit() throws Exception {
     appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(
-            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
-            "4",
-            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
-            "1"),
+            SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+            SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
 
     assertMicroBatchRecordSizes(
@@ -257,15 +324,41 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         Trigger.AvailableNow());
   }
 
+  @TestTemplate
+  public void testReadStreamWithLowAsyncQueuePreload() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    // Set low preload limits to test async queue behavior - background thread 
should load
+    // remaining data
+
+    StreamingQuery query =
+        startStream(
+            ImmutableMap.of(
+                SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT,
+                "5",
+                SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT,
+                "5"));
+
+    List<SimpleRecord> actual = rowsAvailable(query);
+    assertThat(actual)
+        
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
+  }
+
   @TestTemplate
   public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
     File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
     File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
     File output = temp.resolve("junit").toFile();
 
+    Map<String, String> options = Maps.newHashMap();
+    options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
+
     DataStreamWriter querySource =
         spark
             .readStream()
+            .options(options)
             .format("iceberg")
             .load(tableName)
             .writeStream()
@@ -320,10 +413,17 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     long expectedSnapshotId = table.currentSnapshot().snapshotId();
 
     String sinkTable = "availablenow_sink";
+    Map<String, String> options = Maps.newHashMap();
+    options.put(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1");
+    options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
+
     StreamingQuery query =
         spark
             .readStream()
-            .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+            .options(options)
             .format("iceberg")
             .load(tableName)
             .writeStream()
@@ -365,6 +465,142 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     
assertThat(actualResults).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedData));
   }
 
+  @TestTemplate
+  public void testTriggerAvailableNowCapsAsyncPreloadAfterPrepare() {
+    List<List<SimpleRecord>> initialData =
+        List.of(List.of(new SimpleRecord(1, "one")), List.of(new 
SimpleRecord(2, "two")));
+    appendDataAsMultipleSnapshots(initialData);
+
+    table.refresh();
+    long expectedCapSnapshotId = table.currentSnapshot().snapshotId();
+
+    SparkMicroBatchStream stream =
+        new SparkMicroBatchStream(
+            JavaSparkContext.fromSparkContext(spark.sparkContext()),
+            table,
+            table::io,
+            new SparkReadConf(
+                spark,
+                table,
+                ImmutableMap.of(
+                    SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
+                    async.toString(),
+                    SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+                    "1",
+                    SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
+                    "1",
+                    SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT,
+                    "10",
+                    SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT,
+                    "10")),
+            table.schema(),
+            temp.resolve("available-now-cap-checkpoint").toString());
+
+    try {
+      stream.prepareForTriggerAvailableNow();
+
+      appendData(List.of(new SimpleRecord(3, "three")));
+
+      Offset startOffset = stream.initialOffset();
+      Offset firstEndOffset = stream.latestOffset(startOffset, 
stream.getDefaultReadLimit());
+      assertThat(firstEndOffset).isNotNull();
+      stream.planInputPartitions(startOffset, firstEndOffset);
+
+      Offset secondEndOffset = stream.latestOffset(firstEndOffset, 
stream.getDefaultReadLimit());
+      assertThat(secondEndOffset).isNotNull();
+      stream.planInputPartitions(firstEndOffset, secondEndOffset);
+
+      assertThat(stream.latestOffset(secondEndOffset, 
stream.getDefaultReadLimit())).isNull();
+      assertThat(((StreamingOffset) 
secondEndOffset).snapshotId()).isEqualTo(expectedCapSnapshotId);
+    } finally {
+      stream.stop();
+    }
+  }
+
+  @TestTemplate
+  public void testLatestOffsetReturnsNullAfterFinalBatchIsConsumed() throws 
Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    table.refresh();
+    int expectedBatchCount;
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      expectedBatchCount = Iterables.size(tasks);
+    }
+
+    SparkMicroBatchStream stream =
+        newMicroBatchStream(
+            
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"),
+            "drain-to-null-checkpoint");
+
+    try {
+      int plannedBatchCount = 0;
+      Offset startOffset = stream.initialOffset();
+      Offset endOffset = stream.latestOffset(startOffset, 
stream.getDefaultReadLimit());
+      while (endOffset != null) {
+        InputPartition[] partitions = stream.planInputPartitions(startOffset, 
endOffset);
+        assertThat(partitions).isNotEmpty();
+        plannedBatchCount += 1;
+        startOffset = endOffset;
+        endOffset = stream.latestOffset(startOffset, 
stream.getDefaultReadLimit());
+      }
+
+      assertThat(endOffset).isNull();
+      assertThat(plannedBatchCount).isEqualTo(expectedBatchCount);
+    } finally {
+      stream.stop();
+    }
+  }
+
+  @TestTemplate
+  public void testPlanInputPartitionsIsIdempotentForSameOffsets() {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+    SparkMicroBatchStream stream =
+        newMicroBatchStream(
+            
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"),
+            "idempotent-plan-files-checkpoint");
+
+    try {
+      Offset startOffset = stream.initialOffset();
+      Offset endOffset = stream.latestOffset(startOffset, 
stream.getDefaultReadLimit());
+
+      assertThat(endOffset).isNotNull();
+
+      InputPartition[] firstPartitions = 
stream.planInputPartitions(startOffset, endOffset);
+      InputPartition[] secondPartitions = 
stream.planInputPartitions(startOffset, endOffset);
+
+      List<String> firstFileLocations = Lists.newArrayList();
+      for (InputPartition partition : firstPartitions) {
+        SparkInputPartition sparkInputPartition = (SparkInputPartition) 
partition;
+        for (FileScanTask task : 
sparkInputPartition.<FileScanTask>taskGroup().tasks()) {
+          firstFileLocations.add(task.file().location());
+        }
+      }
+
+      List<String> secondFileLocations = Lists.newArrayList();
+      for (InputPartition partition : secondPartitions) {
+        SparkInputPartition sparkInputPartition = (SparkInputPartition) 
partition;
+        for (FileScanTask task : 
sparkInputPartition.<FileScanTask>taskGroup().tasks()) {
+          secondFileLocations.add(task.file().location());
+        }
+      }
+
+      
assertThat(firstFileLocations).containsExactlyInAnyOrderElementsOf(secondFileLocations);
+
+      startOffset = endOffset;
+      endOffset = stream.latestOffset(startOffset, 
stream.getDefaultReadLimit());
+      while (endOffset != null) {
+        assertThat(stream.planInputPartitions(startOffset, 
endOffset)).isNotEmpty();
+        startOffset = endOffset;
+        endOffset = stream.latestOffset(startOffset, 
stream.getDefaultReadLimit());
+      }
+
+      assertThat(endOffset).isNull();
+    } finally {
+      stream.stop();
+    }
+  }
+
   @TestTemplate
   public void testReadStreamOnIcebergThenAddData() throws Exception {
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -394,7 +630,7 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, 
Long.toString(streamStartTimestamp));
 
     List<SimpleRecord> empty = rowsAvailable(query);
-    assertThat(empty.isEmpty()).isTrue();
+    assertThat(empty).isEmpty();
 
     List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
     appendDataAsMultipleSnapshots(expected);
@@ -412,7 +648,7 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP, 
Long.toString(futureTimestamp));
 
     List<SimpleRecord> actual = rowsAvailable(query);
-    assertThat(actual.isEmpty()).isTrue();
+    assertThat(actual).isEmpty();
 
     List<SimpleRecord> data =
         Lists.newArrayList(
@@ -425,13 +661,15 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         .forEach(
             x -> {
               appendData(data);
-              assertThat(rowsAvailable(query).isEmpty()).isTrue();
+              assertThat(rowsAvailable(query)).isEmpty();
             });
 
     waitUntilAfter(futureTimestamp);
 
     // Data appended after the timestamp should appear
     appendData(data);
+    // Allow async background thread to refresh, else test sometimes fails
+    Thread.sleep(50);
     actual = rowsAvailable(query);
     assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
   }
@@ -674,7 +912,6 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     appendDataAsMultipleSnapshots(expected);
 
     makeRewriteDataFiles();
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
@@ -688,10 +925,8 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     appendDataAsMultipleSnapshots(expected);
 
     makeRewriteDataFiles();
-
     assertMicroBatchRecordSizes(
-        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"4"),
-        List.of(5L, 2L));
+        ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, 
"4"), List.of(4L, 3L));
   }
 
   @TestTemplate
@@ -719,7 +954,6 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     makeRewriteDataFiles();
     makeRewriteDataFiles();
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L));
@@ -735,7 +969,6 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     makeRewriteDataFiles();
 
     appendDataAsMultipleSnapshots(expected);
-
     assertMicroBatchRecordSizes(
         ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, 
"1"),
         List.of(1L, 2L, 1L, 1L, 1L, 1L, 1L, 2L, 1L, 1L, 1L, 1L));
@@ -884,13 +1117,18 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   private static final String MEMORY_TABLE = "_stream_view_mem";
 
   private StreamingQuery startStream(Map<String, String> options) throws 
TimeoutException {
+    Map<String, String> allOptions = Maps.newHashMap(options);
+    allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
     return spark
         .readStream()
-        .options(options)
+        .options(allOptions)
         .format("iceberg")
         .load(tableName)
         .writeStream()
-        .options(options)
+        .options(allOptions)
         .format("memory")
         .queryName(MEMORY_TABLE)
         .outputMode(OutputMode.Append())
@@ -915,11 +1153,17 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   private void assertMicroBatchRecordSizes(
       Map<String, String> options, List<Long> expectedMicroBatchRecordSize, 
Trigger trigger)
       throws TimeoutException {
-    Dataset<Row> ds = 
spark.readStream().options(options).format("iceberg").load(tableName);
-    List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
+    Map<String, String> allOptions = Maps.newHashMap(options);
+    allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
+
+    Dataset<Row> ds = 
spark.readStream().options(allOptions).format("iceberg").load(tableName);
 
+    List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
     ds.writeStream()
-        .options(options)
+        .options(allOptions)
         .trigger(trigger)
         .foreachBatch(
             (VoidFunction2<Dataset<Row>, Long>)
@@ -941,4 +1185,21 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         .as(Encoders.bean(SimpleRecord.class))
         .collectAsList();
   }
+
+  private SparkMicroBatchStream newMicroBatchStream(
+      Map<String, String> options, String checkpointDirName) {
+    Map<String, String> allOptions = Maps.newHashMap(options);
+    allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
+
+    return new SparkMicroBatchStream(
+        JavaSparkContext.fromSparkContext(spark.sparkContext()),
+        table,
+        table::io,
+        new SparkReadConf(spark, table, allOptions),
+        table.schema(),
+        temp.resolve(checkpointDirName).toString());
+  }
 }

Reply via email to