This is an automated email from the ASF dual-hosted git repository.

bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 8b7f6d773f Spark 4.1: New Async Spark Micro Batch Planner (#15299)
8b7f6d773f is described below

commit 8b7f6d773f99d4b581f151efbd871c8965f2b3cb
Author: Ruijing Li <[email protected]>
AuthorDate: Mon Mar 16 07:54:58 2026 -0700

    Spark 4.1: New Async Spark Micro Batch Planner (#15299)
---
 docs/docs/spark-configuration.md                   |   5 +
 docs/docs/spark-structured-streaming.md            |   7 +
 .../org/apache/iceberg/spark/SparkReadConf.java    |  33 ++
 .../org/apache/iceberg/spark/SparkReadOptions.java |  13 +
 .../apache/iceberg/spark/SparkSQLProperties.java   |   5 +
 .../spark/source/AsyncSparkMicroBatchPlanner.java  | 527 +++++++++++++++++++++
 .../spark/source/BaseSparkMicroBatchPlanner.java   |  20 +-
 .../spark/source/SparkMicroBatchStream.java        |  11 +-
 .../spark/source/SyncSparkMicroBatchPlanner.java   |   9 +
 .../spark/source/TestStructuredStreamingRead3.java | 126 ++++-
 10 files changed, 743 insertions(+), 13 deletions(-)

diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md
index 6dee824485..8193d1dc35 100644
--- a/docs/docs/spark-configuration.md
+++ b/docs/docs/spark-configuration.md
@@ -196,6 +196,7 @@ val spark = SparkSession.builder()
 | spark.sql.iceberg.executor-cache.locality.enabled      | false               
                                           | Enables locality-aware executor 
cache usage                                                                     
                |
 | spark.sql.iceberg.merge-schema                         | false               
                                           | Enables modifying the table schema 
to match the write schema. Only adds columns missing columns                    
             |
 | spark.sql.iceberg.report-column-stats                  | true                
                                           | Report Puffin Table Statistics if 
available to Spark's Cost Based Optimizer. CBO must be enabled for this to be 
effective       |
+| spark.sql.iceberg.async-micro-batch-planning-enabled   | false               
                                           | Enables asynchronous microbatch 
planning to reduce planning latency by pre-fetching file scan tasks             
                |
 
 ### Read options
 
@@ -220,6 +221,10 @@ spark.read
 | stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; 
if before the oldest known ancestor snapshot, the oldest will be used           
                                                  |
 | streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per 
microbatch                                                                      
                                                                  |
 | streaming-max-rows-per-micro-batch  | INT_MAX | "Soft maximum" number of 
rows per microbatch; always includes all rows in next unprocessed file, 
excludes additional files if their inclusion would exceed the soft max limit |
+| async-micro-batch-planning-enabled      | false                     | 
Enables asynchronous microbatch planning to reduce planning latency by 
pre-fetching file scan tasks                                                    
                       |
+| streaming-snapshot-polling-interval-ms  | 30000                     | 
Overrides the polling time for async planner to refresh and detect new 
snapshots. Only affects when async-micro-batch-planning-enabled is set          
                       |
+| async-queue-preload-file-limit          | 100                       | 
Overrides the number of files loaded to background queue initially. Tune to 
prevent queue starvation. Only affects when async-micro-batch-planning-enabled 
is set             |
+| async-queue-preload-row-limit           | 100000                    | 
Overrides the number of rows loaded to background queue initially. Tune to 
prevent queue starvation. Only affects when async-micro-batch-planning-enabled 
is set              |
 
 ### Write options
 
diff --git a/docs/docs/spark-structured-streaming.md 
b/docs/docs/spark-structured-streaming.md
index e722df1ea4..3313f8150b 100644
--- a/docs/docs/spark-structured-streaming.md
+++ b/docs/docs/spark-structured-streaming.md
@@ -63,6 +63,13 @@ val df = spark.readStream
 !!! info
     Note: In addition to limiting micro-batch sizes on queries that use the 
default trigger (i.e. `Trigger.ProcessingTime`), rate limiting options can be 
applied to queries that use `Trigger.AvailableNow` to split one-time processing 
of all available source data into multiple micro-batches for better query 
scalability. Rate limiting options will be ignored when using the deprecated 
`Trigger.Once` trigger.
 
+### Asynchronous Micro-Batch Planning
+
+Users can enable asynchronous micro-batch planning by setting 
`async-micro-batch-planning-enabled` to true. With this option enabled, Iceberg 
will start processing the current micro-batch while planning the next 
micro-batches in parallel.
+This can help improve query throughput by reducing idle time between 
micro-batches. Users should weigh the tradeoffs, which include higher memory 
usage and increased snapshot detection latency.
+
+Users can also set additional options to control the behavior of asynchronous 
micro-batch planning, found in the [spark 
configuration](spark-configuration.md#read-options).
+
 ## Streaming Writes
 
 To write values from streaming query to Iceberg table, use `DataStreamWriter`:
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 36c95b3aaf..309d7c4fd1 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -225,6 +225,39 @@ public class SparkReadConf {
         .parse();
   }
 
+  public boolean asyncMicroBatchPlanningEnabled() {
+    return confParser
+        .booleanConf()
+        .option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+        .sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+        
.defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT)
+        .parse();
+  }
+
+  public long streamingSnapshotPollingIntervalMs() {
+    return confParser
+        .longConf()
+        .option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS)
+        
.defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT)
+        .parse();
+  }
+
+  public long asyncQueuePreloadFileLimit() {
+    return confParser
+        .longConf()
+        .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT)
+        .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT)
+        .parse();
+  }
+
+  public long asyncQueuePreloadRowLimit() {
+    return confParser
+        .longConf()
+        .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT)
+        .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT)
+        .parse();
+  }
+
   public boolean preserveDataGrouping() {
     return confParser
         .booleanConf()
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 829fb9cdd2..e6d02d1047 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -84,6 +84,19 @@ public class SparkReadOptions {
   public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
       "streaming-max-rows-per-micro-batch";
 
+  // Enable async micro batch planning
+  public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+      "async-micro-batch-planning-enabled";
+  // Polling interval for async planner to refresh table metadata (ms)
+  public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS =
+      "streaming-snapshot-polling-interval-ms";
+  public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT = 
30000L;
+  // Initial queue preload limits for async micro batch planner
+  public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT = 
"async-queue-preload-file-limit";
+  public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L;
+  public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT = 
"async-queue-preload-row-limit";
+  public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L;
+
   // Table path
   public static final String PATH = "path";
 
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 14b2b8c958..867370ea44 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -112,4 +112,9 @@ public class SparkSQLProperties {
 
   // Prefix for custom snapshot properties
   public static final String SNAPSHOT_PROPERTY_PREFIX = 
"spark.sql.iceberg.snapshot-property.";
+
+  // Controls whether to enable async micro batch planning for session
+  public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+      "spark.sql.iceberg.async-micro-batch-planning-enabled";
+  public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = 
false;
 }
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..527b41cdcf
--- /dev/null
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.connector.read.streaming.ReadAllAvailable;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner 
implements AutoCloseable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AsyncSparkMicroBatchPlanner.class);
+  private static final int PLAN_FILES_CACHE_MAX_SIZE = 10;
+  private static final long QUEUE_POLL_TIMEOUT_MS = 100L; // 100 ms
+
+  private final long minQueuedFiles;
+  private final long minQueuedRows;
+
+  // Cache for planFiles results to handle duplicate calls
+  private final Cache<Pair<StreamingOffset, StreamingOffset>, 
List<FileScanTask>> planFilesCache;
+
+  // Queue to buffer pre-fetched file scan tasks
+  private final LinkedBlockingQueue<Pair<StreamingOffset, FileScanTask>> queue;
+
+  // Background executor for async operations
+  private final ScheduledExecutorService executor;
+
+  // Error tracking
+  private volatile Throwable refreshFailedThrowable;
+  private volatile Throwable fillQueueFailedThrowable;
+
+  // Tracking queue state
+  private final AtomicLong queuedFileCount = new AtomicLong(0);
+  private final AtomicLong queuedRowCount = new AtomicLong(0);
+  private volatile Pair<StreamingOffset, FileScanTask> tail;
+  private Snapshot lastQueuedSnapshot;
+  private boolean stopped;
+
+  // Cap for Trigger.AvailableNow - don't process beyond this offset
+  private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+  /**
+   * This class manages a queue of FileScanTask + StreamingOffset. On 
creation, it starts up an
+   * asynchronous polling process which populates the queue when a new 
snapshot arrives or the
+   * minimum amount of queued data is too low.
+   *
+   * <p>Note: this will capture the state of the table when snapshots are 
added to the queue. If a
+   * snapshot is expired after being added to the queue, the job will still 
process it.
+   */
+  AsyncSparkMicroBatchPlanner(
+      Table table,
+      SparkReadConf readConf,
+      StreamingOffset initialOffset,
+      StreamingOffset maybeEndOffset,
+      StreamingOffset lastOffsetForTriggerAvailableNow) {
+    super(table, readConf);
+    this.minQueuedFiles = readConf().maxFilesPerMicroBatch();
+    this.minQueuedRows = readConf().maxRecordsPerMicroBatch();
+    this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+    this.planFilesCache = 
Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build();
+    this.queue = new LinkedBlockingQueue<>();
+
+    table().refresh();
+    // Synchronously add data to the queue to meet our initial constraints
+    fillQueue(initialOffset, maybeEndOffset);
+
+    this.executor =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread thread = new Thread(r, "iceberg-async-planner-" + 
table().name());
+              thread.setDaemon(true);
+              return thread;
+            });
+    // Schedule table refresh at configured interval
+    long pollingIntervalMs = readConf().streamingSnapshotPollingIntervalMs();
+    this.executor.scheduleWithFixedDelay(
+        this::refreshAndTrapException, pollingIntervalMs, pollingIntervalMs, 
TimeUnit.MILLISECONDS);
+    // Schedule queue fill to run frequently (use polling interval for tests, 
cap at 100ms for
+    // production)
+    long queueFillIntervalMs = Math.min(QUEUE_POLL_TIMEOUT_MS, 
pollingIntervalMs);
+    executor.scheduleWithFixedDelay(
+        () -> fillQueueAndTrapException(lastQueuedSnapshot),
+        0,
+        queueFillIntervalMs,
+        TimeUnit.MILLISECONDS);
+
+    LOG.info(
+        "Started AsyncSparkMicroBatchPlanner for {} from initialOffset: {}",
+        table().name(),
+        initialOffset);
+  }
+
+  @Override
+  public synchronized void stop() {
+    Preconditions.checkArgument(
+        !stopped, "AsyncSparkMicroBatchPlanner for {} was already stopped", 
table().name());
+    stopped = true;
+    LOG.info("Stopping AsyncSparkMicroBatchPlanner for table: {}", 
table().name());
+    executor.shutdownNow();
+    boolean terminated = false;
+    try {
+      terminated =
+          executor.awaitTermination(
+              readConf().streamingSnapshotPollingIntervalMs() * 2, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ignored) {
+      // Restore interrupt status
+      Thread.currentThread().interrupt();
+    }
+    LOG.info("AsyncSparkMicroBatchPlanner for table: {}, stopped: {}", 
table().name(), terminated);
+  }
+
+  @Override
+  public void close() {
+    stop();
+  }
+
+  /**
+   * Spark can call this multiple times; it should produce the same answer 
every time.
+   *
+   * @param startOffset the starting offset of this microbatch, position is 
inclusive
+   * @param endOffset the end offset of this microbatch, position is exclusive
+   * @return the list of files to scan between these offsets
+   */
+  @Override
+  public synchronized List<FileScanTask> planFiles(
+      StreamingOffset startOffset, StreamingOffset endOffset) {
+    return planFilesCache.get(
+        Pair.of(startOffset, endOffset),
+        key -> {
+          LOG.info(
+              "running planFiles for {}, startOffset: {}, endOffset: {}",
+              table().name(),
+              startOffset,
+              endOffset);
+          List<FileScanTask> result = new LinkedList<>();
+          Pair<StreamingOffset, FileScanTask> elem;
+          StreamingOffset currentOffset;
+          boolean shouldTerminate = false;
+          long filesInPlan = 0;
+          long rowsInPlan = 0;
+
+          do {
+            // Synchronize here since we are polling, checking for empty and 
updating tail
+            synchronized (queue) {
+              try {
+                elem = queue.poll(QUEUE_POLL_TIMEOUT_MS, 
TimeUnit.MILLISECONDS);
+              } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new RuntimeException("Interrupted while polling queue", 
e);
+              }
+              if (queue.isEmpty()) {
+                tail = null;
+              }
+            }
+
+            if (elem != null) {
+              currentOffset = elem.first();
+              LOG.debug("planFiles consumed: {}", currentOffset);
+              FileScanTask currentTask = elem.second();
+              filesInPlan += 1;
+              long elemRows = currentTask.file().recordCount();
+              rowsInPlan += elemRows;
+              queuedFileCount.decrementAndGet();
+              queuedRowCount.addAndGet(-elemRows);
+              result.add(currentTask);
+
+              // try to peek at the next entry of the queue and see if we 
should stop
+              Pair<StreamingOffset, FileScanTask> nextElem = queue.peek();
+              boolean endOffsetPeek = false;
+              if (nextElem != null) {
+                endOffsetPeek = endOffset.equals(nextElem.first());
+              }
+              // end offset may be synthetic and not exist in the queue
+              boolean endOffsetSynthetic =
+                  currentOffset.snapshotId() == endOffset.snapshotId()
+                      && (currentOffset.position() + 1) == 
endOffset.position();
+              shouldTerminate = endOffsetPeek || endOffsetSynthetic;
+            } else {
+              LOG.trace("planFiles hasn't reached {}, waiting", endOffset);
+            }
+          } while (!shouldTerminate && refreshFailedThrowable == null);
+
+          if (refreshFailedThrowable != null) {
+            throw new RuntimeException("Table refresh failed", 
refreshFailedThrowable);
+          }
+
+          LOG.info(
+              "completed planFiles for {}, startOffset: {}, endOffset: {}, 
files: {}, rows: {}",
+              table().name(),
+              startOffset,
+              endOffset,
+              filesInPlan,
+              rowsInPlan);
+          return result;
+        });
+  }
+
+  /**
+   * This needs to be non destructive on the queue as spark could call this 
multiple times. Each
+   * time, depending on the table state it could return something different
+   *
+   * @param startOffset the starting offset of the next microbatch
+   * @param limit a limit for how many files/bytes/rows the next microbatch 
should include
+   * @return The end offset to use for the next microbatch, null signals that 
no data is available
+   */
+  @Override
+  public synchronized StreamingOffset latestOffset(StreamingOffset 
startOffset, ReadLimit limit) {
+    LOG.info(
+        "running latestOffset for {}, startOffset: {}, limit: {}",
+        table().name(),
+        startOffset,
+        limit);
+
+    if (table().currentSnapshot() == null) {
+      LOG.info("latestOffset returning START_OFFSET, currentSnapshot() is 
null");
+      return StreamingOffset.START_OFFSET;
+    }
+
+    if (table().currentSnapshot().timestampMillis() < 
readConf().streamFromTimestamp()) {
+      LOG.info("latestOffset returning START_OFFSET, currentSnapshot() < 
fromTimestamp");
+      return StreamingOffset.START_OFFSET;
+    }
+
+    // if any exceptions were encountered in the background process, raise 
them here
+    if (refreshFailedThrowable != null) {
+      throw new RuntimeException(refreshFailedThrowable);
+    }
+    if (fillQueueFailedThrowable != null) {
+      throw new RuntimeException(fillQueueFailedThrowable);
+    }
+
+    // if we want to read all available we don't need to scan files, just 
snapshots
+    if (limit instanceof ReadAllAvailable) {
+      // If Trigger.AvailableNow cap is set, return it directly
+      if (this.lastOffsetForTriggerAvailableNow != null) {
+        return this.lastOffsetForTriggerAvailableNow;
+      }
+      Snapshot lastValidSnapshot = table().snapshot(startOffset.snapshotId());
+      Snapshot nextValidSnapshot;
+      do {
+        nextValidSnapshot = nextValidSnapshot(lastValidSnapshot);
+        if (nextValidSnapshot != null) {
+          lastValidSnapshot = nextValidSnapshot;
+        }
+      } while (nextValidSnapshot != null);
+      return new StreamingOffset(
+          lastValidSnapshot.snapshotId(),
+          MicroBatchUtils.addedFilesCount(table(), lastValidSnapshot),
+          false);
+    }
+
+    return computeLimitedOffset(limit);
+  }
+
+  private StreamingOffset computeLimitedOffset(ReadLimit limit) {
+    UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+    long rowsSeen = 0;
+    long filesSeen = 0;
+    LOG.debug(
+        "latestOffset queue status, queuedFiles: {}, queuedRows: {}",
+        queuedFileCount.get(),
+        queuedRowCount.get());
+
+    // Convert to list for indexed access
+    List<Pair<StreamingOffset, FileScanTask>> queueList = 
Lists.newArrayList(queue);
+    for (int i = 0; i < queueList.size(); i++) {
+      Pair<StreamingOffset, FileScanTask> elem = queueList.get(i);
+      long fileRows = elem.second().file().recordCount();
+
+      // Hard limit on files - stop BEFORE exceeding
+      if (filesSeen + 1 > unpackedLimits.getMaxFiles()) {
+        if (filesSeen == 0) {
+          return null;
+        }
+        LOG.debug(
+            "latestOffset hit file limit at {}, rows: {}, files: {}",
+            elem.first(),
+            rowsSeen,
+            filesSeen);
+        return elem.first();
+      }
+
+      // Soft limit on rows - include file FIRST, then check
+      rowsSeen += fileRows;
+      filesSeen += 1;
+
+      // Check if we've hit the row limit after including this file
+      if (rowsSeen >= unpackedLimits.getMaxRows()) {
+        if (filesSeen == 1 && rowsSeen > unpackedLimits.getMaxRows()) {
+          LOG.warn(
+              "File {} at offset {} contains {} records, exceeding 
maxRecordsPerMicroBatch limit of {}. "
+                  + "This file will be processed entirely to guarantee forward 
progress. "
+                  + "Consider increasing the limit or writing smaller files to 
avoid unexpected memory usage.",
+              elem.second().file().location(),
+              elem.first(),
+              fileRows,
+              unpackedLimits.getMaxRows());
+        }
+        // Return the offset of the NEXT element (or synthesize tail+1)
+        if (i + 1 < queueList.size()) {
+          LOG.debug(
+              "latestOffset hit row limit at {}, rows: {}, files: {}",
+              queueList.get(i + 1).first(),
+              rowsSeen,
+              filesSeen);
+          return queueList.get(i + 1).first();
+        } else {
+          // This is the last element - return tail+1
+          StreamingOffset current = elem.first();
+          StreamingOffset result =
+              new StreamingOffset(
+                  current.snapshotId(), current.position() + 1, 
current.shouldScanAllFiles());
+          LOG.debug(
+              "latestOffset hit row limit at tail {}, rows: {}, files: {}",
+              result,
+              rowsSeen,
+              filesSeen);
+          return result;
+        }
+      }
+    }
+
+    // if we got here there aren't enough files to exceed our limits
+    if (tail != null) {
+      StreamingOffset tailOffset = tail.first();
+      // we have to increment the position by 1 since we want to include the 
tail in the read and
+      // position is non-inclusive
+      StreamingOffset latestOffset =
+          new StreamingOffset(
+              tailOffset.snapshotId(), tailOffset.position() + 1, 
tailOffset.shouldScanAllFiles());
+      LOG.debug("latestOffset returning all queued data {}", latestOffset);
+      return latestOffset;
+    }
+
+    // if we got here the queue is empty
+    LOG.debug("latestOffset no data, returning null");
+    return null;
+  }
+
+  // Background task wrapper that traps exceptions
+  private void refreshAndTrapException() {
+    try {
+      table().refresh();
+    } catch (Throwable t) {
+      LOG.error("Failed to refresh table {}", table().name(), t);
+      refreshFailedThrowable = t;
+    }
+  }
+
+  // Background task wrapper that traps exceptions
+  private void fillQueueAndTrapException(Snapshot snapshot) {
+    try {
+      fillQueue(snapshot);
+    } catch (Throwable t) {
+      LOG.error("Failed to fill queue for table {}", table().name(), t);
+      fillQueueFailedThrowable = t;
+    }
+  }
+
+  /** Generate a MicroBatch based on input parameters and add to the queue */
+  private void addMicroBatchToQueue(
+      Snapshot snapshot, long startFileIndex, long endFileIndex, boolean 
shouldScanAllFile) {
+    LOG.info("Adding MicroBatch for snapshot: {} to the queue", 
snapshot.snapshotId());
+    MicroBatches.MicroBatch microBatch =
+        MicroBatches.from(snapshot, table().io())
+            .caseSensitive(readConf().caseSensitive())
+            .specsById(table().specs())
+            .generate(startFileIndex, endFileIndex, Long.MAX_VALUE, 
shouldScanAllFile);
+
+    long position = startFileIndex;
+    for (FileScanTask task : microBatch.tasks()) {
+      Pair<StreamingOffset, FileScanTask> elem =
+          Pair.of(new StreamingOffset(microBatch.snapshotId(), position, 
shouldScanAllFile), task);
+      queuedFileCount.incrementAndGet();
+      queuedRowCount.addAndGet(task.file().recordCount());
+      // I have to synchronize here so queue and tail can never be out of sync
+      synchronized (queue) {
+        queue.add(elem);
+        tail = elem;
+      }
+      position += 1;
+    }
+    if (LOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("\n");
+      for (Pair<StreamingOffset, FileScanTask> elem : queue) {
+        sb.append(elem.first()).append("\n");
+      }
+      LOG.debug(sb.toString());
+    }
+    lastQueuedSnapshot = snapshot;
+  }
+
+  private void fillQueue(StreamingOffset fromOffset, StreamingOffset toOffset) 
{
+    LOG.debug("filling queue from {}, to: {}", fromOffset, toOffset);
+    Snapshot currentSnapshot = table().snapshot(fromOffset.snapshotId());
+    // this could be a partial snapshot so add it outside the loop
+    if (currentSnapshot != null) {
+      addMicroBatchToQueue(
+          currentSnapshot,
+          fromOffset.position(),
+          MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+          fromOffset.shouldScanAllFiles());
+    }
+    if (toOffset != null) {
+      if (currentSnapshot != null) {
+        while (currentSnapshot.snapshotId() != toOffset.snapshotId()) {
+          currentSnapshot = nextValidSnapshot(currentSnapshot);
+          if (currentSnapshot != null) {
+            addMicroBatchToQueue(
+                currentSnapshot,
+                0,
+                MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+                false);
+          } else {
+            break;
+          }
+        }
+      }
+      // toOffset snapshot already added in loop when currentSnapshot == 
toOffset
+    } else {
+      fillQueueInitialBuffer(currentSnapshot);
+    }
+  }
+
+  private void fillQueueInitialBuffer(Snapshot startSnapshot) {
+    // toOffset is null - fill initial buffer to prevent queue starvation 
before background
+    // thread starts. Use configured limits to avoid loading all snapshots
+    // (which could cause OOM on tables with thousands of snapshots).
+    long targetRows = readConf().asyncQueuePreloadRowLimit();
+    long targetFiles = readConf().asyncQueuePreloadFileLimit();
+
+    Snapshot tableCurrentSnapshot = table().currentSnapshot();
+    if (tableCurrentSnapshot == null) {
+      return; // Empty table
+    }
+
+    // START_OFFSET case: initialize using nextValidSnapshot which respects 
timestamp filtering
+    Snapshot current = startSnapshot;
+    if (current == null) {
+      current = nextValidSnapshot(null);
+      if (current != null) {
+        addMicroBatchToQueue(current, 0, 
MicroBatchUtils.addedFilesCount(table(), current), false);
+      }
+    }
+
+    // Continue loading more snapshots within safety limits
+    if (current != null) {
+      while ((queuedRowCount.get() < targetRows || queuedFileCount.get() < 
targetFiles)
+          && current.snapshotId() != tableCurrentSnapshot.snapshotId()) {
+        current = nextValidSnapshot(current);
+        if (current != null) {
+          addMicroBatchToQueue(
+              current, 0, MicroBatchUtils.addedFilesCount(table(), current), 
false);
+        } else {
+          break;
+        }
+      }
+    }
+  }
+
+  /** Try to populate the queue with data from unread snapshots */
+  private void fillQueue(Snapshot readFrom) {
+    // Don't add beyond cap for Trigger.AvailableNow
+    if (this.lastOffsetForTriggerAvailableNow != null
+        && readFrom != null
+        && readFrom.snapshotId() >= 
this.lastOffsetForTriggerAvailableNow.snapshotId()) {
+      LOG.debug(
+          "Reached cap snapshot {}, not adding more",
+          this.lastOffsetForTriggerAvailableNow.snapshotId());
+      return;
+    }
+
+    if ((queuedRowCount.get() > minQueuedRows) || (queuedFileCount.get() > 
minQueuedFiles)) {
+      // we have enough data buffered, check back shortly
+      LOG.debug(
+          "Buffer is full, {} > {} or {} > {}",
+          queuedRowCount.get(),
+          minQueuedRows,
+          queuedFileCount.get(),
+          minQueuedFiles);
+    } else {
+      // add an entire snapshot to the queue
+      Snapshot nextValidSnapshot = nextValidSnapshot(readFrom);
+      if (nextValidSnapshot != null) {
+        addMicroBatchToQueue(
+            nextValidSnapshot,
+            0,
+            MicroBatchUtils.addedFilesCount(table(), nextValidSnapshot),
+            false);
+      } else {
+        LOG.debug("No snapshots ready to be read");
+      }
+    }
+  }
+}
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
index 70c0129484..9298c2bbdf 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
@@ -81,15 +81,29 @@ abstract class BaseSparkMicroBatchPlanner implements 
SparkMicroBatchPlanner {
   }
 
   /**
-   * Get the next snapshot skipping over rewrite and delete snapshots.
+   * Get the next snapshot skipping over rewrite and delete snapshots. Async 
must handle nulls.
    *
    * @param curSnapshot the current snapshot
    * @return the next valid snapshot (not a rewrite or delete snapshot), 
returns null if all
    *     remaining snapshots should be skipped.
    */
   protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
-    Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
-
+    Snapshot nextSnapshot;
+    // if there were no valid snapshots, check for an initialOffset again
+    if (curSnapshot == null) {
+      StreamingOffset startingOffset =
+          MicroBatchUtils.determineStartingOffset(table, 
readConf.streamFromTimestamp());
+      LOG.debug("determineStartingOffset picked startingOffset: {}", 
startingOffset);
+      if (StreamingOffset.START_OFFSET.equals(startingOffset)) {
+        return null;
+      }
+      nextSnapshot = table.snapshot(startingOffset.snapshotId());
+    } else {
+      if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+        return null;
+      }
+      nextSnapshot = SnapshotUtil.snapshotAfter(table, 
curSnapshot.snapshotId());
+    }
     // skip over rewrite and delete snapshots
     while (!shouldProcess(nextSnapshot)) {
       LOG.debug("Skipping snapshot: {}", nextSnapshot);
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 7bea7c236c..c9a0f2566b 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -197,13 +197,18 @@ public class SparkMicroBatchStream implements 
MicroBatchStream, SupportsTriggerA
   }
 
   private void initializePlanner(StreamingOffset startOffset, StreamingOffset 
endOffset) {
-    this.planner =
-        new SyncSparkMicroBatchPlanner(table, readConf, 
lastOffsetForTriggerAvailableNow);
+    if (readConf.asyncMicroBatchPlanningEnabled()) {
+      this.planner =
+          new AsyncSparkMicroBatchPlanner(
+              table, readConf, startOffset, endOffset, 
lastOffsetForTriggerAvailableNow);
+    } else {
+      this.planner =
+          new SyncSparkMicroBatchPlanner(table, readConf, 
lastOffsetForTriggerAvailableNow);
+    }
   }
 
   @Override
   public Offset latestOffset(Offset startOffset, ReadLimit limit) {
-    // calculate end offset get snapshotId from the startOffset
     Preconditions.checkArgument(
         startOffset instanceof StreamingOffset,
         "Invalid start offset: %s is not a StreamingOffset",
diff --git 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
index fe662e2b83..f1b0029c54 100644
--- 
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
+++ 
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
@@ -187,6 +187,15 @@ class SyncSparkMicroBatchPlanner extends 
BaseSparkMicroBatchPlanner {
               if (curRecordCount >= maxRows) {
                 // we included the file, so increment the number of files
                 // read in the current snapshot.
+                if (curFilesAdded == 1 && curRecordCount > maxRows) {
+                  LOG.warn(
+                      "File {} contains {} records, exceeding 
maxRecordsPerMicroBatch limit of {}. "
+                          + "This file will be processed entirely to guarantee 
forward progress. "
+                          + "Consider increasing the limit or writing smaller 
files to avoid unexpected memory usage.",
+                      task.file().location(),
+                      task.file().recordCount(),
+                      maxRows);
+                }
                 ++curPos;
                 shouldContinueReading = false;
                 break;
diff --git 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 80f2c68640..98e83bdd17 100644
--- 
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++ 
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -31,13 +31,16 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.IntStream;
 import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DataOperations;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
 import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.RewriteFiles;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -53,7 +56,9 @@ import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.spark.api.java.function.VoidFunction2;
 import org.apache.spark.sql.Dataset;
@@ -73,10 +78,73 @@ import org.junit.jupiter.api.extension.ExtendWith;
 @ExtendWith(ParameterizedTestExtension.class)
 public final class TestStructuredStreamingRead3 extends CatalogTestBase {
 
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}, 
async = {3}")
+  public static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties(),
+        false
+      },
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        SparkCatalogConfig.HIVE.properties(),
+        true
+      },
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        SparkCatalogConfig.HADOOP.properties(),
+        false
+      },
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        SparkCatalogConfig.HADOOP.properties(),
+        true
+      },
+      {
+        SparkCatalogConfig.REST.catalogName(),
+        SparkCatalogConfig.REST.implementation(),
+        ImmutableMap.builder()
+            .putAll(SparkCatalogConfig.REST.properties())
+            .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
+            .build(),
+        false
+      },
+      {
+        SparkCatalogConfig.REST.catalogName(),
+        SparkCatalogConfig.REST.implementation(),
+        ImmutableMap.builder()
+            .putAll(SparkCatalogConfig.REST.properties())
+            .put(CatalogProperties.URI, 
restCatalog.properties().get(CatalogProperties.URI))
+            .build(),
+        true
+      },
+      {
+        SparkCatalogConfig.SPARK_SESSION.catalogName(),
+        SparkCatalogConfig.SPARK_SESSION.implementation(),
+        SparkCatalogConfig.SPARK_SESSION.properties(),
+        false
+      },
+      {
+        SparkCatalogConfig.SPARK_SESSION.catalogName(),
+        SparkCatalogConfig.SPARK_SESSION.implementation(),
+        SparkCatalogConfig.SPARK_SESSION.properties(),
+        true
+      }
+    };
+  }
+
   private Table table;
 
   private final AtomicInteger microBatches = new AtomicInteger();
 
+  @Parameter(index = 3)
+  private Boolean async;
+
   /**
    * test data to be used by multiple writes each write creates a snapshot and 
writes a list of
    * records
@@ -197,8 +265,7 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         Trigger.AvailableNow());
 
     // soft limit of 1 is being enforced, the stream is not blocked.
-    StreamingQuery query =
-        
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
 "1"));
+    StreamingQuery query = 
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
 
     // check answer correctness only 1 record read the micro-batch will be 
stuck
     List<SimpleRecord> actual = rowsAvailable(query);
@@ -258,15 +325,40 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
         Trigger.AvailableNow());
   }
 
+  @TestTemplate
+  public void testReadStreamWithLowAsyncQueuePreload() throws Exception {
+    appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+    // Set low preload limits to test async queue behavior - background thread 
should load
+    // remaining data
+    StreamingQuery query =
+        startStream(
+            ImmutableMap.of(
+                SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT,
+                "5",
+                SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT,
+                "5"));
+
+    List<SimpleRecord> actual = rowsAvailable(query);
+    assertThat(actual)
+        
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
+  }
+
   @TestTemplate
   public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws 
Exception {
     File writerCheckpointFolder = 
temp.resolve("writer-checkpoint-folder").toFile();
     File writerCheckpoint = new File(writerCheckpointFolder, 
"writer-checkpoint");
     File output = temp.resolve("junit").toFile();
 
+    Map<String, String> options = Maps.newHashMap();
+    options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
+
     DataStreamWriter querySource =
         spark
             .readStream()
+            .options(options)
             .format("iceberg")
             .load(tableName)
             .writeStream()
@@ -321,10 +413,17 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
     long expectedSnapshotId = table.currentSnapshot().snapshotId();
 
     String sinkTable = "availablenow_sink";
+    Map<String, String> options = Maps.newHashMap();
+    options.put(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1");
+    options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
+
     StreamingQuery query =
         spark
             .readStream()
-            .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+            .options(options)
             .format("iceberg")
             .load(tableName)
             .writeStream()
@@ -433,6 +532,8 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
 
     // Data appended after the timestamp should appear
     appendData(data);
+    // Allow async background thread to refresh, else test sometimes fails
+    Thread.sleep(50);
     actual = rowsAvailable(query);
     assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
   }
@@ -885,13 +986,18 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   private static final String MEMORY_TABLE = "_stream_view_mem";
 
   private StreamingQuery startStream(Map<String, String> options) throws 
TimeoutException {
+    Map<String, String> allOptions = Maps.newHashMap(options);
+    allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
     return spark
         .readStream()
-        .options(options)
+        .options(allOptions)
         .format("iceberg")
         .load(tableName)
         .writeStream()
-        .options(options)
+        .options(allOptions)
         .format("memory")
         .queryName(MEMORY_TABLE)
         .outputMode(OutputMode.Append())
@@ -916,11 +1022,17 @@ public final class TestStructuredStreamingRead3 extends 
CatalogTestBase {
   private void assertMicroBatchRecordSizes(
       Map<String, String> options, List<Long> expectedMicroBatchRecordSize, 
Trigger trigger)
       throws TimeoutException {
-    Dataset<Row> ds = 
spark.readStream().options(options).format("iceberg").load(tableName);
+    Map<String, String> allOptions = Maps.newHashMap(options);
+    allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED, 
async.toString());
+    if (async) {
+      
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS, 
"1");
+    }
+
+    Dataset<Row> ds = 
spark.readStream().options(allOptions).format("iceberg").load(tableName);
 
     List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
     ds.writeStream()
-        .options(options)
+        .options(allOptions)
         .trigger(trigger)
         .foreachBatch(
             (VoidFunction2<Dataset<Row>, Long>)


Reply via email to