This is an automated email from the ASF dual-hosted git repository.
bryanck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 8b7f6d773f Spark 4.1: New Async Spark Micro Batch Planner (#15299)
8b7f6d773f is described below
commit 8b7f6d773f99d4b581f151efbd871c8965f2b3cb
Author: Ruijing Li <[email protected]>
AuthorDate: Mon Mar 16 07:54:58 2026 -0700
Spark 4.1: New Async Spark Micro Batch Planner (#15299)
---
docs/docs/spark-configuration.md | 5 +
docs/docs/spark-structured-streaming.md | 7 +
.../org/apache/iceberg/spark/SparkReadConf.java | 33 ++
.../org/apache/iceberg/spark/SparkReadOptions.java | 13 +
.../apache/iceberg/spark/SparkSQLProperties.java | 5 +
.../spark/source/AsyncSparkMicroBatchPlanner.java | 527 +++++++++++++++++++++
.../spark/source/BaseSparkMicroBatchPlanner.java | 20 +-
.../spark/source/SparkMicroBatchStream.java | 11 +-
.../spark/source/SyncSparkMicroBatchPlanner.java | 9 +
.../spark/source/TestStructuredStreamingRead3.java | 126 ++++-
10 files changed, 743 insertions(+), 13 deletions(-)
diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md
index 6dee824485..8193d1dc35 100644
--- a/docs/docs/spark-configuration.md
+++ b/docs/docs/spark-configuration.md
@@ -196,6 +196,7 @@ val spark = SparkSession.builder()
| spark.sql.iceberg.executor-cache.locality.enabled | false
| Enables locality-aware executor
cache usage
|
| spark.sql.iceberg.merge-schema | false
| Enables modifying the table schema
to match the write schema. Only adds columns missing columns
|
| spark.sql.iceberg.report-column-stats | true
| Report Puffin Table Statistics if
available to Spark's Cost Based Optimizer. CBO must be enabled for this to be
effective |
+| spark.sql.iceberg.async-micro-batch-planning-enabled | false
| Enables asynchronous microbatch
planning to reduce planning latency by pre-fetching file scan tasks
|
### Read options
@@ -220,6 +221,10 @@ spark.read
| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from;
if before the oldest known ancestor snapshot, the oldest will be used
|
| streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per
microbatch
|
| streaming-max-rows-per-micro-batch | INT_MAX | "Soft maximum" number of
rows per microbatch; always includes all rows in next unprocessed file,
excludes additional files if their inclusion would exceed the soft max limit |
+| async-micro-batch-planning-enabled | false |
Enables asynchronous microbatch planning to reduce planning latency by
pre-fetching file scan tasks
|
+| streaming-snapshot-polling-interval-ms | 30000 |
Overrides the polling time for async planner to refresh and detect new
snapshots. Only affects when async-micro-batch-planning-enabled is set
|
+| async-queue-preload-file-limit | 100 |
Overrides the number of files loaded to background queue initially. Tune to
prevent queue starvation. Only affects when async-micro-batch-planning-enabled
is set |
+| async-queue-preload-row-limit | 100000 |
Overrides the number of rows loaded to background queue initially. Tune to
prevent queue starvation. Only affects when async-micro-batch-planning-enabled
is set |
### Write options
diff --git a/docs/docs/spark-structured-streaming.md
b/docs/docs/spark-structured-streaming.md
index e722df1ea4..3313f8150b 100644
--- a/docs/docs/spark-structured-streaming.md
+++ b/docs/docs/spark-structured-streaming.md
@@ -63,6 +63,13 @@ val df = spark.readStream
!!! info
Note: In addition to limiting micro-batch sizes on queries that use the
default trigger (i.e. `Trigger.ProcessingTime`), rate limiting options can be
applied to queries that use `Trigger.AvailableNow` to split one-time processing
of all available source data into multiple micro-batches for better query
scalability. Rate limiting options will be ignored when using the deprecated
`Trigger.Once` trigger.
+### Asynchronous Micro-Batch Planning
+
+Users can enable asynchronous micro-batch planning by setting
`async-micro-batch-planning-enabled` to true. With this option enabled, Iceberg
will start processing the current micro-batch while planning the next
micro-batches in parallel.
+This can help improve query throughput by reducing idle time between
micro-batches. Users should weigh the tradeoffs, which include higher memory
usage and increased snapshot detection latency.
+
+Users can also set additional options to control the behavior of asynchronous
micro-batch planning, found in the [spark
configuration](spark-configuration.md#read-options).
+
## Streaming Writes
To write values from streaming query to Iceberg table, use `DataStreamWriter`:
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 36c95b3aaf..309d7c4fd1 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -225,6 +225,39 @@ public class SparkReadConf {
.parse();
}
+ public boolean asyncMicroBatchPlanningEnabled() {
+ return confParser
+ .booleanConf()
+ .option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+ .sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+
.defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT)
+ .parse();
+ }
+
+ public long streamingSnapshotPollingIntervalMs() {
+ return confParser
+ .longConf()
+ .option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS)
+
.defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT)
+ .parse();
+ }
+
+ public long asyncQueuePreloadFileLimit() {
+ return confParser
+ .longConf()
+ .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT)
+ .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT)
+ .parse();
+ }
+
+ public long asyncQueuePreloadRowLimit() {
+ return confParser
+ .longConf()
+ .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT)
+ .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT)
+ .parse();
+ }
+
public boolean preserveDataGrouping() {
return confParser
.booleanConf()
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index 829fb9cdd2..e6d02d1047 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -84,6 +84,19 @@ public class SparkReadOptions {
public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
"streaming-max-rows-per-micro-batch";
+ // Enable async micro batch planning
+ public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+ "async-micro-batch-planning-enabled";
+ // Polling interval for async planner to refresh table metadata (ms)
+ public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS =
+ "streaming-snapshot-polling-interval-ms";
+ public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT =
30000L;
+ // Initial queue preload limits for async micro batch planner
+ public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT =
"async-queue-preload-file-limit";
+ public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L;
+ public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT =
"async-queue-preload-row-limit";
+ public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L;
+
// Table path
public static final String PATH = "path";
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 14b2b8c958..867370ea44 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -112,4 +112,9 @@ public class SparkSQLProperties {
// Prefix for custom snapshot properties
public static final String SNAPSHOT_PROPERTY_PREFIX =
"spark.sql.iceberg.snapshot-property.";
+
+ // Controls whether to enable async micro batch planning for session
+ public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+ "spark.sql.iceberg.async-micro-batch-planning-enabled";
+ public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT =
false;
}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..527b41cdcf
--- /dev/null
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.connector.read.streaming.ReadAllAvailable;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner
implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncSparkMicroBatchPlanner.class);
+ private static final int PLAN_FILES_CACHE_MAX_SIZE = 10;
+ private static final long QUEUE_POLL_TIMEOUT_MS = 100L; // 100 ms
+
+ private final long minQueuedFiles;
+ private final long minQueuedRows;
+
+ // Cache for planFiles results to handle duplicate calls
+ private final Cache<Pair<StreamingOffset, StreamingOffset>,
List<FileScanTask>> planFilesCache;
+
+ // Queue to buffer pre-fetched file scan tasks
+ private final LinkedBlockingQueue<Pair<StreamingOffset, FileScanTask>> queue;
+
+ // Background executor for async operations
+ private final ScheduledExecutorService executor;
+
+ // Error tracking
+ private volatile Throwable refreshFailedThrowable;
+ private volatile Throwable fillQueueFailedThrowable;
+
+ // Tracking queue state
+ private final AtomicLong queuedFileCount = new AtomicLong(0);
+ private final AtomicLong queuedRowCount = new AtomicLong(0);
+ private volatile Pair<StreamingOffset, FileScanTask> tail;
+ private Snapshot lastQueuedSnapshot;
+ private boolean stopped;
+
+ // Cap for Trigger.AvailableNow - don't process beyond this offset
+ private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+ /**
+ * This class manages a queue of FileScanTask + StreamingOffset. On
creation, it starts up an
+ * asynchronous polling process which populates the queue when a new
snapshot arrives or the
+ * minimum amount of queued data is too low.
+ *
+ * <p>Note: this will capture the state of the table when snapshots are
added to the queue. If a
+ * snapshot is expired after being added to the queue, the job will still
process it.
+ */
+ AsyncSparkMicroBatchPlanner(
+ Table table,
+ SparkReadConf readConf,
+ StreamingOffset initialOffset,
+ StreamingOffset maybeEndOffset,
+ StreamingOffset lastOffsetForTriggerAvailableNow) {
+ super(table, readConf);
+ this.minQueuedFiles = readConf().maxFilesPerMicroBatch();
+ this.minQueuedRows = readConf().maxRecordsPerMicroBatch();
+ this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+ this.planFilesCache =
Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build();
+ this.queue = new LinkedBlockingQueue<>();
+
+ table().refresh();
+ // Synchronously add data to the queue to meet our initial constraints
+ fillQueue(initialOffset, maybeEndOffset);
+
+ this.executor =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread thread = new Thread(r, "iceberg-async-planner-" +
table().name());
+ thread.setDaemon(true);
+ return thread;
+ });
+ // Schedule table refresh at configured interval
+ long pollingIntervalMs = readConf().streamingSnapshotPollingIntervalMs();
+ this.executor.scheduleWithFixedDelay(
+ this::refreshAndTrapException, pollingIntervalMs, pollingIntervalMs,
TimeUnit.MILLISECONDS);
+ // Schedule queue fill to run frequently (use polling interval for tests,
cap at 100ms for
+ // production)
+ long queueFillIntervalMs = Math.min(QUEUE_POLL_TIMEOUT_MS,
pollingIntervalMs);
+ executor.scheduleWithFixedDelay(
+ () -> fillQueueAndTrapException(lastQueuedSnapshot),
+ 0,
+ queueFillIntervalMs,
+ TimeUnit.MILLISECONDS);
+
+ LOG.info(
+ "Started AsyncSparkMicroBatchPlanner for {} from initialOffset: {}",
+ table().name(),
+ initialOffset);
+ }
+
+ @Override
+ public synchronized void stop() {
+ Preconditions.checkArgument(
+ !stopped, "AsyncSparkMicroBatchPlanner for {} was already stopped",
table().name());
+ stopped = true;
+ LOG.info("Stopping AsyncSparkMicroBatchPlanner for table: {}",
table().name());
+ executor.shutdownNow();
+ boolean terminated = false;
+ try {
+ terminated =
+ executor.awaitTermination(
+ readConf().streamingSnapshotPollingIntervalMs() * 2,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignored) {
+ // Restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ LOG.info("AsyncSparkMicroBatchPlanner for table: {}, stopped: {}",
table().name(), terminated);
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+
+ /**
+ * Spark can call this multiple times; it should produce the same answer
every time.
+ *
+ * @param startOffset the starting offset of this microbatch, position is
inclusive
+ * @param endOffset the end offset of this microbatch, position is exclusive
+ * @return the list of files to scan between these offsets
+ */
+ @Override
+ public synchronized List<FileScanTask> planFiles(
+ StreamingOffset startOffset, StreamingOffset endOffset) {
+ return planFilesCache.get(
+ Pair.of(startOffset, endOffset),
+ key -> {
+ LOG.info(
+ "running planFiles for {}, startOffset: {}, endOffset: {}",
+ table().name(),
+ startOffset,
+ endOffset);
+ List<FileScanTask> result = new LinkedList<>();
+ Pair<StreamingOffset, FileScanTask> elem;
+ StreamingOffset currentOffset;
+ boolean shouldTerminate = false;
+ long filesInPlan = 0;
+ long rowsInPlan = 0;
+
+ do {
+ // Synchronize here since we are polling, checking for empty and
updating tail
+ synchronized (queue) {
+ try {
+ elem = queue.poll(QUEUE_POLL_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while polling queue",
e);
+ }
+ if (queue.isEmpty()) {
+ tail = null;
+ }
+ }
+
+ if (elem != null) {
+ currentOffset = elem.first();
+ LOG.debug("planFiles consumed: {}", currentOffset);
+ FileScanTask currentTask = elem.second();
+ filesInPlan += 1;
+ long elemRows = currentTask.file().recordCount();
+ rowsInPlan += elemRows;
+ queuedFileCount.decrementAndGet();
+ queuedRowCount.addAndGet(-elemRows);
+ result.add(currentTask);
+
+ // try to peek at the next entry of the queue and see if we
should stop
+ Pair<StreamingOffset, FileScanTask> nextElem = queue.peek();
+ boolean endOffsetPeek = false;
+ if (nextElem != null) {
+ endOffsetPeek = endOffset.equals(nextElem.first());
+ }
+ // end offset may be synthetic and not exist in the queue
+ boolean endOffsetSynthetic =
+ currentOffset.snapshotId() == endOffset.snapshotId()
+ && (currentOffset.position() + 1) ==
endOffset.position();
+ shouldTerminate = endOffsetPeek || endOffsetSynthetic;
+ } else {
+ LOG.trace("planFiles hasn't reached {}, waiting", endOffset);
+ }
+ } while (!shouldTerminate && refreshFailedThrowable == null);
+
+ if (refreshFailedThrowable != null) {
+ throw new RuntimeException("Table refresh failed",
refreshFailedThrowable);
+ }
+
+ LOG.info(
+ "completed planFiles for {}, startOffset: {}, endOffset: {},
files: {}, rows: {}",
+ table().name(),
+ startOffset,
+ endOffset,
+ filesInPlan,
+ rowsInPlan);
+ return result;
+ });
+ }
+
+ /**
+ * This needs to be non destructive on the queue as spark could call this
multiple times. Each
+ * time, depending on the table state it could return something different
+ *
+ * @param startOffset the starting offset of the next microbatch
+ * @param limit a limit for how many files/bytes/rows the next microbatch
should include
+ * @return The end offset to use for the next microbatch, null signals that
no data is available
+ */
+ @Override
+ public synchronized StreamingOffset latestOffset(StreamingOffset
startOffset, ReadLimit limit) {
+ LOG.info(
+ "running latestOffset for {}, startOffset: {}, limit: {}",
+ table().name(),
+ startOffset,
+ limit);
+
+ if (table().currentSnapshot() == null) {
+ LOG.info("latestOffset returning START_OFFSET, currentSnapshot() is
null");
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table().currentSnapshot().timestampMillis() <
readConf().streamFromTimestamp()) {
+ LOG.info("latestOffset returning START_OFFSET, currentSnapshot() <
fromTimestamp");
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // if any exceptions were encountered in the background process, raise
them here
+ if (refreshFailedThrowable != null) {
+ throw new RuntimeException(refreshFailedThrowable);
+ }
+ if (fillQueueFailedThrowable != null) {
+ throw new RuntimeException(fillQueueFailedThrowable);
+ }
+
+ // if we want to read all available we don't need to scan files, just
snapshots
+ if (limit instanceof ReadAllAvailable) {
+ // If Trigger.AvailableNow cap is set, return it directly
+ if (this.lastOffsetForTriggerAvailableNow != null) {
+ return this.lastOffsetForTriggerAvailableNow;
+ }
+ Snapshot lastValidSnapshot = table().snapshot(startOffset.snapshotId());
+ Snapshot nextValidSnapshot;
+ do {
+ nextValidSnapshot = nextValidSnapshot(lastValidSnapshot);
+ if (nextValidSnapshot != null) {
+ lastValidSnapshot = nextValidSnapshot;
+ }
+ } while (nextValidSnapshot != null);
+ return new StreamingOffset(
+ lastValidSnapshot.snapshotId(),
+ MicroBatchUtils.addedFilesCount(table(), lastValidSnapshot),
+ false);
+ }
+
+ return computeLimitedOffset(limit);
+ }
+
+ private StreamingOffset computeLimitedOffset(ReadLimit limit) {
+ UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+ long rowsSeen = 0;
+ long filesSeen = 0;
+ LOG.debug(
+ "latestOffset queue status, queuedFiles: {}, queuedRows: {}",
+ queuedFileCount.get(),
+ queuedRowCount.get());
+
+ // Convert to list for indexed access
+ List<Pair<StreamingOffset, FileScanTask>> queueList =
Lists.newArrayList(queue);
+ for (int i = 0; i < queueList.size(); i++) {
+ Pair<StreamingOffset, FileScanTask> elem = queueList.get(i);
+ long fileRows = elem.second().file().recordCount();
+
+ // Hard limit on files - stop BEFORE exceeding
+ if (filesSeen + 1 > unpackedLimits.getMaxFiles()) {
+ if (filesSeen == 0) {
+ return null;
+ }
+ LOG.debug(
+ "latestOffset hit file limit at {}, rows: {}, files: {}",
+ elem.first(),
+ rowsSeen,
+ filesSeen);
+ return elem.first();
+ }
+
+ // Soft limit on rows - include file FIRST, then check
+ rowsSeen += fileRows;
+ filesSeen += 1;
+
+ // Check if we've hit the row limit after including this file
+ if (rowsSeen >= unpackedLimits.getMaxRows()) {
+ if (filesSeen == 1 && rowsSeen > unpackedLimits.getMaxRows()) {
+ LOG.warn(
+ "File {} at offset {} contains {} records, exceeding
maxRecordsPerMicroBatch limit of {}. "
+ + "This file will be processed entirely to guarantee forward
progress. "
+ + "Consider increasing the limit or writing smaller files to
avoid unexpected memory usage.",
+ elem.second().file().location(),
+ elem.first(),
+ fileRows,
+ unpackedLimits.getMaxRows());
+ }
+ // Return the offset of the NEXT element (or synthesize tail+1)
+ if (i + 1 < queueList.size()) {
+ LOG.debug(
+ "latestOffset hit row limit at {}, rows: {}, files: {}",
+ queueList.get(i + 1).first(),
+ rowsSeen,
+ filesSeen);
+ return queueList.get(i + 1).first();
+ } else {
+ // This is the last element - return tail+1
+ StreamingOffset current = elem.first();
+ StreamingOffset result =
+ new StreamingOffset(
+ current.snapshotId(), current.position() + 1,
current.shouldScanAllFiles());
+ LOG.debug(
+ "latestOffset hit row limit at tail {}, rows: {}, files: {}",
+ result,
+ rowsSeen,
+ filesSeen);
+ return result;
+ }
+ }
+ }
+
+ // if we got here there aren't enough files to exceed our limits
+ if (tail != null) {
+ StreamingOffset tailOffset = tail.first();
+ // we have to increment the position by 1 since we want to include the
tail in the read and
+ // position is non-inclusive
+ StreamingOffset latestOffset =
+ new StreamingOffset(
+ tailOffset.snapshotId(), tailOffset.position() + 1,
tailOffset.shouldScanAllFiles());
+ LOG.debug("latestOffset returning all queued data {}", latestOffset);
+ return latestOffset;
+ }
+
+ // if we got here the queue is empty
+ LOG.debug("latestOffset no data, returning null");
+ return null;
+ }
+
+ // Background task wrapper that traps exceptions
+ private void refreshAndTrapException() {
+ try {
+ table().refresh();
+ } catch (Throwable t) {
+ LOG.error("Failed to refresh table {}", table().name(), t);
+ refreshFailedThrowable = t;
+ }
+ }
+
+ // Background task wrapper that traps exceptions
+ private void fillQueueAndTrapException(Snapshot snapshot) {
+ try {
+ fillQueue(snapshot);
+ } catch (Throwable t) {
+ LOG.error("Failed to fill queue for table {}", table().name(), t);
+ fillQueueFailedThrowable = t;
+ }
+ }
+
+ /** Generate a MicroBatch based on input parameters and add to the queue */
+ private void addMicroBatchToQueue(
+ Snapshot snapshot, long startFileIndex, long endFileIndex, boolean
shouldScanAllFile) {
+ LOG.info("Adding MicroBatch for snapshot: {} to the queue",
snapshot.snapshotId());
+ MicroBatches.MicroBatch microBatch =
+ MicroBatches.from(snapshot, table().io())
+ .caseSensitive(readConf().caseSensitive())
+ .specsById(table().specs())
+ .generate(startFileIndex, endFileIndex, Long.MAX_VALUE,
shouldScanAllFile);
+
+ long position = startFileIndex;
+ for (FileScanTask task : microBatch.tasks()) {
+ Pair<StreamingOffset, FileScanTask> elem =
+ Pair.of(new StreamingOffset(microBatch.snapshotId(), position,
shouldScanAllFile), task);
+ queuedFileCount.incrementAndGet();
+ queuedRowCount.addAndGet(task.file().recordCount());
+ // I have to synchronize here so queue and tail can never be out of sync
+ synchronized (queue) {
+ queue.add(elem);
+ tail = elem;
+ }
+ position += 1;
+ }
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder("\n");
+ for (Pair<StreamingOffset, FileScanTask> elem : queue) {
+ sb.append(elem.first()).append("\n");
+ }
+ LOG.debug(sb.toString());
+ }
+ lastQueuedSnapshot = snapshot;
+ }
+
+ private void fillQueue(StreamingOffset fromOffset, StreamingOffset toOffset)
{
+ LOG.debug("filling queue from {}, to: {}", fromOffset, toOffset);
+ Snapshot currentSnapshot = table().snapshot(fromOffset.snapshotId());
+ // this could be a partial snapshot so add it outside the loop
+ if (currentSnapshot != null) {
+ addMicroBatchToQueue(
+ currentSnapshot,
+ fromOffset.position(),
+ MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+ fromOffset.shouldScanAllFiles());
+ }
+ if (toOffset != null) {
+ if (currentSnapshot != null) {
+ while (currentSnapshot.snapshotId() != toOffset.snapshotId()) {
+ currentSnapshot = nextValidSnapshot(currentSnapshot);
+ if (currentSnapshot != null) {
+ addMicroBatchToQueue(
+ currentSnapshot,
+ 0,
+ MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+ false);
+ } else {
+ break;
+ }
+ }
+ }
+ // toOffset snapshot already added in loop when currentSnapshot ==
toOffset
+ } else {
+ fillQueueInitialBuffer(currentSnapshot);
+ }
+ }
+
+ private void fillQueueInitialBuffer(Snapshot startSnapshot) {
+ // toOffset is null - fill initial buffer to prevent queue starvation
before background
+ // thread starts. Use configured limits to avoid loading all snapshots
+ // (which could cause OOM on tables with thousands of snapshots).
+ long targetRows = readConf().asyncQueuePreloadRowLimit();
+ long targetFiles = readConf().asyncQueuePreloadFileLimit();
+
+ Snapshot tableCurrentSnapshot = table().currentSnapshot();
+ if (tableCurrentSnapshot == null) {
+ return; // Empty table
+ }
+
+ // START_OFFSET case: initialize using nextValidSnapshot which respects
timestamp filtering
+ Snapshot current = startSnapshot;
+ if (current == null) {
+ current = nextValidSnapshot(null);
+ if (current != null) {
+ addMicroBatchToQueue(current, 0,
MicroBatchUtils.addedFilesCount(table(), current), false);
+ }
+ }
+
+ // Continue loading more snapshots within safety limits
+ if (current != null) {
+ while ((queuedRowCount.get() < targetRows || queuedFileCount.get() <
targetFiles)
+ && current.snapshotId() != tableCurrentSnapshot.snapshotId()) {
+ current = nextValidSnapshot(current);
+ if (current != null) {
+ addMicroBatchToQueue(
+ current, 0, MicroBatchUtils.addedFilesCount(table(), current),
false);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ /** Try to populate the queue with data from unread snapshots */
+ private void fillQueue(Snapshot readFrom) {
+ // Don't add beyond cap for Trigger.AvailableNow
+ if (this.lastOffsetForTriggerAvailableNow != null
+ && readFrom != null
+ && readFrom.snapshotId() >=
this.lastOffsetForTriggerAvailableNow.snapshotId()) {
+ LOG.debug(
+ "Reached cap snapshot {}, not adding more",
+ this.lastOffsetForTriggerAvailableNow.snapshotId());
+ return;
+ }
+
+ if ((queuedRowCount.get() > minQueuedRows) || (queuedFileCount.get() >
minQueuedFiles)) {
+ // we have enough data buffered, check back shortly
+ LOG.debug(
+ "Buffer is full, {} > {} or {} > {}",
+ queuedRowCount.get(),
+ minQueuedRows,
+ queuedFileCount.get(),
+ minQueuedFiles);
+ } else {
+ // add an entire snapshot to the queue
+ Snapshot nextValidSnapshot = nextValidSnapshot(readFrom);
+ if (nextValidSnapshot != null) {
+ addMicroBatchToQueue(
+ nextValidSnapshot,
+ 0,
+ MicroBatchUtils.addedFilesCount(table(), nextValidSnapshot),
+ false);
+ } else {
+ LOG.debug("No snapshots ready to be read");
+ }
+ }
+ }
+}
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
index 70c0129484..9298c2bbdf 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
@@ -81,15 +81,29 @@ abstract class BaseSparkMicroBatchPlanner implements
SparkMicroBatchPlanner {
}
/**
- * Get the next snapshot skipping over rewrite and delete snapshots.
+ * Get the next snapshot skipping over rewrite and delete snapshots. Async
must handle nulls.
*
* @param curSnapshot the current snapshot
* @return the next valid snapshot (not a rewrite or delete snapshot),
returns null if all
* remaining snapshots should be skipped.
*/
protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
- Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
-
+ Snapshot nextSnapshot;
+ // if there were no valid snapshots, check for an initialOffset again
+ if (curSnapshot == null) {
+ StreamingOffset startingOffset =
+ MicroBatchUtils.determineStartingOffset(table,
readConf.streamFromTimestamp());
+ LOG.debug("determineStartingOffset picked startingOffset: {}",
startingOffset);
+ if (StreamingOffset.START_OFFSET.equals(startingOffset)) {
+ return null;
+ }
+ nextSnapshot = table.snapshot(startingOffset.snapshotId());
+ } else {
+ if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ return null;
+ }
+ nextSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
+ }
// skip over rewrite and delete snapshots
while (!shouldProcess(nextSnapshot)) {
LOG.debug("Skipping snapshot: {}", nextSnapshot);
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 7bea7c236c..c9a0f2566b 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -197,13 +197,18 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
}
private void initializePlanner(StreamingOffset startOffset, StreamingOffset
endOffset) {
- this.planner =
- new SyncSparkMicroBatchPlanner(table, readConf,
lastOffsetForTriggerAvailableNow);
+ if (readConf.asyncMicroBatchPlanningEnabled()) {
+ this.planner =
+ new AsyncSparkMicroBatchPlanner(
+ table, readConf, startOffset, endOffset,
lastOffsetForTriggerAvailableNow);
+ } else {
+ this.planner =
+ new SyncSparkMicroBatchPlanner(table, readConf,
lastOffsetForTriggerAvailableNow);
+ }
}
@Override
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
- // calculate end offset get snapshotId from the startOffset
Preconditions.checkArgument(
startOffset instanceof StreamingOffset,
"Invalid start offset: %s is not a StreamingOffset",
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
index fe662e2b83..f1b0029c54 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
@@ -187,6 +187,15 @@ class SyncSparkMicroBatchPlanner extends
BaseSparkMicroBatchPlanner {
if (curRecordCount >= maxRows) {
// we included the file, so increment the number of files
// read in the current snapshot.
+ if (curFilesAdded == 1 && curRecordCount > maxRows) {
+ LOG.warn(
+ "File {} contains {} records, exceeding
maxRecordsPerMicroBatch limit of {}. "
+ + "This file will be processed entirely to guarantee
forward progress. "
+ + "Consider increasing the limit or writing smaller
files to avoid unexpected memory usage.",
+ task.file().location(),
+ task.file().recordCount(),
+ maxRows);
+ }
++curPos;
shouldContinueReading = false;
break;
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 80f2c68640..98e83bdd17 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -31,13 +31,16 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -53,7 +56,9 @@ import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
@@ -73,10 +78,73 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
public final class TestStructuredStreamingRead3 extends CatalogTestBase {
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2},
async = {3}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ SparkCatalogConfig.HIVE.properties(),
+ false
+ },
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ SparkCatalogConfig.HIVE.properties(),
+ true
+ },
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties(),
+ false
+ },
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties(),
+ true
+ },
+ {
+ SparkCatalogConfig.REST.catalogName(),
+ SparkCatalogConfig.REST.implementation(),
+ ImmutableMap.builder()
+ .putAll(SparkCatalogConfig.REST.properties())
+ .put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
+ .build(),
+ false
+ },
+ {
+ SparkCatalogConfig.REST.catalogName(),
+ SparkCatalogConfig.REST.implementation(),
+ ImmutableMap.builder()
+ .putAll(SparkCatalogConfig.REST.properties())
+ .put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
+ .build(),
+ true
+ },
+ {
+ SparkCatalogConfig.SPARK_SESSION.catalogName(),
+ SparkCatalogConfig.SPARK_SESSION.implementation(),
+ SparkCatalogConfig.SPARK_SESSION.properties(),
+ false
+ },
+ {
+ SparkCatalogConfig.SPARK_SESSION.catalogName(),
+ SparkCatalogConfig.SPARK_SESSION.implementation(),
+ SparkCatalogConfig.SPARK_SESSION.properties(),
+ true
+ }
+ };
+ }
+
private Table table;
private final AtomicInteger microBatches = new AtomicInteger();
+ @Parameter(index = 3)
+ private Boolean async;
+
/**
* test data to be used by multiple writes each write creates a snapshot and
writes a list of
* records
@@ -197,8 +265,7 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
Trigger.AvailableNow());
// soft limit of 1 is being enforced, the stream is not blocked.
- StreamingQuery query =
-
startStream(ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"1"));
+ StreamingQuery query =
startStream(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "1");
// check answer correctness only 1 record read the micro-batch will be
stuck
List<SimpleRecord> actual = rowsAvailable(query);
@@ -258,15 +325,40 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
Trigger.AvailableNow());
}
+ @TestTemplate
+ public void testReadStreamWithLowAsyncQueuePreload() throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ // Set low preload limits to test async queue behavior - background thread
should load
+ // remaining data
+ StreamingQuery query =
+ startStream(
+ ImmutableMap.of(
+ SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT,
+ "5",
+ SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT,
+ "5"));
+
+ List<SimpleRecord> actual = rowsAvailable(query);
+ assertThat(actual)
+
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
+ }
+
@TestTemplate
public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws
Exception {
File writerCheckpointFolder =
temp.resolve("writer-checkpoint-folder").toFile();
File writerCheckpoint = new File(writerCheckpointFolder,
"writer-checkpoint");
File output = temp.resolve("junit").toFile();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+ options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
+
DataStreamWriter querySource =
spark
.readStream()
+ .options(options)
.format("iceberg")
.load(tableName)
.writeStream()
@@ -321,10 +413,17 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
long expectedSnapshotId = table.currentSnapshot().snapshotId();
String sinkTable = "availablenow_sink";
+ Map<String, String> options = Maps.newHashMap();
+ options.put(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1");
+ options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+ options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
+
StreamingQuery query =
spark
.readStream()
- .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+ .options(options)
.format("iceberg")
.load(tableName)
.writeStream()
@@ -433,6 +532,8 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
// Data appended after the timestamp should appear
appendData(data);
+ // Allow async background thread to refresh, else test sometimes fails
+ Thread.sleep(50);
actual = rowsAvailable(query);
assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
}
@@ -885,13 +986,18 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
private static final String MEMORY_TABLE = "_stream_view_mem";
private StreamingQuery startStream(Map<String, String> options) throws
TimeoutException {
+ Map<String, String> allOptions = Maps.newHashMap(options);
+ allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
return spark
.readStream()
- .options(options)
+ .options(allOptions)
.format("iceberg")
.load(tableName)
.writeStream()
- .options(options)
+ .options(allOptions)
.format("memory")
.queryName(MEMORY_TABLE)
.outputMode(OutputMode.Append())
@@ -916,11 +1022,17 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
private void assertMicroBatchRecordSizes(
Map<String, String> options, List<Long> expectedMicroBatchRecordSize,
Trigger trigger)
throws TimeoutException {
- Dataset<Row> ds =
spark.readStream().options(options).format("iceberg").load(tableName);
+ Map<String, String> allOptions = Maps.newHashMap(options);
+ allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
+
+ Dataset<Row> ds =
spark.readStream().options(allOptions).format("iceberg").load(tableName);
List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
ds.writeStream()
- .options(options)
+ .options(allOptions)
.trigger(trigger)
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>)