This is an automated email from the ASF dual-hosted git repository.
kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e57247b55a Spark 3.4: Backport Async Micro Batch Planner to 3.4
(#16311)
e57247b55a is described below
commit e57247b55a737cfc3702949181b3a2a3d10174b2
Author: Kevin Liu <[email protected]>
AuthorDate: Tue May 12 23:29:53 2026 -0400
Spark 3.4: Backport Async Micro Batch Planner to 3.4 (#16311)
Backport of #15992 to spark/v3.4. Stacked on PR #16307 (#15683
SerializableFileIOWithSize), which is itself a backport.
Adaptations from the source PR:
- SparkMicroBatchStream.java was replaced wholesale with the v3.5
post-#15992 version because v3.4 had structural drift; the refactor extracts
the planning logic into the new planner classes and there are no v3.4-only
features in this file.
- TestStructuredStreamingRead3.java was likewise replaced with the v3.5
version (which adds parameterized sync/async coverage). The only non-mechanical
change is using 'SparkCatalogConfig.SPARK' instead of
'SparkCatalogConfig.SPARK_SESSION', because v3.4 still uses the older enum name.
---
.../org/apache/iceberg/spark/SparkReadConf.java | 33 ++
.../org/apache/iceberg/spark/SparkReadOptions.java | 15 +
.../apache/iceberg/spark/SparkSQLProperties.java | 5 +
.../spark/source/AsyncSparkMicroBatchPlanner.java | 543 +++++++++++++++++++++
.../spark/source/BaseSparkMicroBatchPlanner.java | 151 ++++++
.../iceberg/spark/source/MicroBatchUtils.java | 69 +++
.../spark/source/SparkMicroBatchPlanner.java | 47 ++
.../spark/source/SparkMicroBatchStream.java | 353 ++------------
.../spark/source/SyncSparkMicroBatchPlanner.java | 249 ++++++++++
.../source/TestAsyncSparkMicroBatchPlanner.java | 61 +++
.../spark/source/TestMicroBatchPlanningUtils.java | 100 ++++
.../spark/source/TestStructuredStreamingRead3.java | 309 +++++++++++-
12 files changed, 1588 insertions(+), 347 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 3c8556731f..0c13338764 100644
--- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -265,6 +265,39 @@ public class SparkReadConf {
.parse();
}
+ public boolean asyncMicroBatchPlanningEnabled() {
+ return confParser
+ .booleanConf()
+ .option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+ .sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
+
.defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT)
+ .parse();
+ }
+
+ public long streamingSnapshotPollingIntervalMs() {
+ return confParser
+ .longConf()
+ .option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS)
+
.defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT)
+ .parse();
+ }
+
+ public long asyncQueuePreloadFileLimit() {
+ return confParser
+ .longConf()
+ .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT)
+ .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT)
+ .parse();
+ }
+
+ public long asyncQueuePreloadRowLimit() {
+ return confParser
+ .longConf()
+ .option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT)
+ .defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT)
+ .parse();
+ }
+
public boolean preserveDataGrouping() {
return confParser
.booleanConf()
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
index c31a7e5554..a6e6479d66 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java
@@ -92,6 +92,21 @@ public class SparkReadOptions {
public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
"streaming-max-rows-per-micro-batch";
+ // Enable async micro batch planning
+ public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+ "async-micro-batch-planning-enabled";
+
+ // Polling interval for async planner to refresh table metadata (ms)
+ public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS =
+ "streaming-snapshot-polling-interval-ms";
+ public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT =
30000L;
+
+ // Initial queue preload limits for async micro batch planner
+ public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT =
"async-queue-preload-file-limit";
+ public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L;
+ public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT =
"async-queue-preload-row-limit";
+ public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L;
+
// Table path
public static final String PATH = "path";
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
index 41dc73add3..1ccf8c3c83 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
@@ -106,4 +106,9 @@ public class SparkSQLProperties {
// Controls whether to report available column statistics to Spark for query
optimization.
public static final String REPORT_COLUMN_STATS =
"spark.sql.iceberg.report-column-stats";
public static final boolean REPORT_COLUMN_STATS_DEFAULT = true;
+
+ // Controls whether to enable async micro batch planning for session
+ public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
+ "spark.sql.iceberg.async-micro-batch-planning-enabled";
+ public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT =
false;
}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..3e442f9917
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/AsyncSparkMicroBatchPlanner.java
@@ -0,0 +1,543 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.spark.sql.connector.read.streaming.ReadAllAvailable;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class AsyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner
implements AutoCloseable {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncSparkMicroBatchPlanner.class);
+ private static final int PLAN_FILES_CACHE_MAX_SIZE = 10;
+ private static final long QUEUE_POLL_TIMEOUT_MS = 100L; // 100 ms
+
+ private final long minQueuedFiles;
+ private final long minQueuedRows;
+
+ // Cache for planFiles results to handle duplicate calls
+ private final Cache<Pair<StreamingOffset, StreamingOffset>,
List<FileScanTask>> planFilesCache;
+
+ // Queue to buffer pre-fetched file scan tasks
+ private final LinkedBlockingDeque<Pair<StreamingOffset, FileScanTask>> queue;
+
+ // Background executor for async operations
+ private final ScheduledExecutorService executor;
+
+ // Error tracking
+ private volatile Throwable refreshFailedThrowable;
+ private volatile Throwable fillQueueFailedThrowable;
+
+ // Tracking queue state
+ private final AtomicLong queuedFileCount = new AtomicLong(0);
+ private final AtomicLong queuedRowCount = new AtomicLong(0);
+ private Snapshot lastQueuedSnapshot;
+ private boolean stopped;
+
+ // Cap for Trigger.AvailableNow - don't process beyond this offset
+ private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+ /**
+ * This class manages a queue of FileScanTask + StreamingOffset. On
creation, it starts up an
+ * asynchronous polling process which populates the queue when a new
snapshot arrives or the
+ * minimum amount of queued data is too low.
+ *
+ * <p>Note: this will capture the state of the table when snapshots are
added to the queue. If a
+ * snapshot is expired after being added to the queue, the job will still
process it.
+ */
+ AsyncSparkMicroBatchPlanner(
+ Table table,
+ SparkReadConf readConf,
+ StreamingOffset initialOffset,
+ StreamingOffset maybeEndOffset,
+ StreamingOffset lastOffsetForTriggerAvailableNow) {
+ super(table, readConf);
+ this.minQueuedFiles = readConf().maxFilesPerMicroBatch();
+ this.minQueuedRows = readConf().maxRecordsPerMicroBatch();
+ this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+ this.planFilesCache =
Caffeine.newBuilder().maximumSize(PLAN_FILES_CACHE_MAX_SIZE).build();
+ this.queue = new LinkedBlockingDeque<>();
+
+ table().refresh();
+
+ // Synchronously add data to the queue to meet our initial constraints.
+ // For Trigger.AvailableNow, constructor-time preload is normally
initialized from
+ // latestOffset(...) with no explicit end offset, so bounded preload must
stop at
+ // Trigger.AvailableNow snapshot.
+ fillQueue(initialOffset, maybeEndOffset);
+
+ this.executor =
+ Executors.newSingleThreadScheduledExecutor(
+ r -> {
+ Thread thread = new Thread(r, "iceberg-async-planner-" +
table().name());
+ thread.setDaemon(true);
+ return thread;
+ });
+ // Schedule table refresh at configured interval
+ long pollingIntervalMs = readConf().streamingSnapshotPollingIntervalMs();
+ this.executor.scheduleWithFixedDelay(
+ this::refreshAndTrapException, pollingIntervalMs, pollingIntervalMs,
TimeUnit.MILLISECONDS);
+ // Schedule queue fill to run frequently (use polling interval for tests,
cap at 100ms for
+ // production)
+ long queueFillIntervalMs = Math.min(QUEUE_POLL_TIMEOUT_MS,
pollingIntervalMs);
+ executor.scheduleWithFixedDelay(
+ () -> fillQueueAndTrapException(lastQueuedSnapshot),
+ 0,
+ queueFillIntervalMs,
+ TimeUnit.MILLISECONDS);
+
+ LOG.info(
+ "Started AsyncSparkMicroBatchPlanner for {} from initialOffset: {}",
+ table().name(),
+ initialOffset);
+ }
+
+ @Override
+ public synchronized void stop() {
+ Preconditions.checkArgument(
+ !stopped, "AsyncSparkMicroBatchPlanner for {} was already stopped",
table().name());
+ stopped = true;
+ LOG.info("Stopping AsyncSparkMicroBatchPlanner for table: {}",
table().name());
+ executor.shutdownNow();
+ boolean terminated = false;
+ try {
+ terminated =
+ executor.awaitTermination(
+ readConf().streamingSnapshotPollingIntervalMs() * 2,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ignored) {
+ // Restore interrupt status
+ Thread.currentThread().interrupt();
+ }
+ LOG.info("AsyncSparkMicroBatchPlanner for table: {}, stopped: {}",
table().name(), terminated);
+ }
+
+ @Override
+ public void close() {
+ stop();
+ }
+
+ /**
+ * Spark can call this multiple times; it should produce the same answer
every time.
+ *
+ * @param startOffset the starting offset of this microbatch, position is
inclusive
+ * @param endOffset the end offset of this microbatch, position is exclusive
+ * @return the list of files to scan between these offsets
+ */
+ @Override
+ public synchronized List<FileScanTask> planFiles(
+ StreamingOffset startOffset, StreamingOffset endOffset) {
+ return planFilesCache.get(
+ Pair.of(startOffset, endOffset),
+ key -> {
+ LOG.info(
+ "running planFiles for {}, startOffset: {}, endOffset: {}",
+ table().name(),
+ startOffset,
+ endOffset);
+ List<FileScanTask> result = new LinkedList<>();
+ Pair<StreamingOffset, FileScanTask> elem;
+ StreamingOffset currentOffset;
+ boolean shouldTerminate = false;
+ long filesInPlan = 0;
+ long rowsInPlan = 0;
+
+ do {
+ try {
+ elem = queue.pollFirst(QUEUE_POLL_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while polling queue", e);
+ }
+
+ if (elem != null) {
+ currentOffset = elem.first();
+ LOG.debug("planFiles consumed: {}", currentOffset);
+ FileScanTask currentTask = elem.second();
+ filesInPlan += 1;
+ long elemRows = currentTask.file().recordCount();
+ rowsInPlan += elemRows;
+ queuedFileCount.decrementAndGet();
+ queuedRowCount.addAndGet(-elemRows);
+ result.add(currentTask);
+
+ // try to peek at the next entry of the queue and see if we
should stop
+ Pair<StreamingOffset, FileScanTask> nextElem = queue.peekFirst();
+ boolean endOffsetPeek = false;
+ if (nextElem != null) {
+ endOffsetPeek = endOffset.equals(nextElem.first());
+ }
+ // end offset may be synthetic and not exist in the queue
+ boolean endOffsetSynthetic =
+ currentOffset.snapshotId() == endOffset.snapshotId()
+ && (currentOffset.position() + 1) ==
endOffset.position();
+ shouldTerminate = endOffsetPeek || endOffsetSynthetic;
+ } else {
+ LOG.trace("planFiles hasn't reached {}, waiting", endOffset);
+ }
+ } while (!shouldTerminate
+ && refreshFailedThrowable == null
+ && fillQueueFailedThrowable == null);
+
+ if (refreshFailedThrowable != null) {
+ throw new RuntimeException("Table refresh failed",
refreshFailedThrowable);
+ }
+
+ if (fillQueueFailedThrowable != null) {
+ throw new RuntimeException("Queue filling failed",
fillQueueFailedThrowable);
+ }
+
+ LOG.info(
+ "completed planFiles for {}, startOffset: {}, endOffset: {},
files: {}, rows: {}",
+ table().name(),
+ startOffset,
+ endOffset,
+ filesInPlan,
+ rowsInPlan);
+ return result;
+ });
+ }
+
+ /**
+ * This needs to be non destructive on the queue as spark could call this
multiple times. Each
+ * time, depending on the table state it could return something different
+ *
+ * @param startOffset the starting offset of the next microbatch
+ * @param limit a limit for how many files/bytes/rows the next microbatch
should include
+ * @return The end offset to use for the next microbatch, null signals that
no data is available
+ */
+ @Override
+ public synchronized StreamingOffset latestOffset(StreamingOffset
startOffset, ReadLimit limit) {
+ LOG.info(
+ "running latestOffset for {}, startOffset: {}, limit: {}",
+ table().name(),
+ startOffset,
+ limit);
+
+ if (table().currentSnapshot() == null) {
+ LOG.info("latestOffset returning START_OFFSET, currentSnapshot() is
null");
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table().currentSnapshot().timestampMillis() <
readConf().streamFromTimestamp()) {
+ LOG.info("latestOffset returning START_OFFSET, currentSnapshot() <
fromTimestamp");
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // if any exceptions were encountered in the background process, raise
them here
+ if (refreshFailedThrowable != null) {
+ throw new RuntimeException(refreshFailedThrowable);
+ }
+ if (fillQueueFailedThrowable != null) {
+ throw new RuntimeException(fillQueueFailedThrowable);
+ }
+
+ // if we want to read all available we don't need to scan files, just
snapshots
+ if (limit instanceof ReadAllAvailable) {
+ // If Trigger.AvailableNow cap is set, return it directly
+ if (this.lastOffsetForTriggerAvailableNow != null) {
+ return this.lastOffsetForTriggerAvailableNow;
+ }
+ Snapshot lastValidSnapshot = table().snapshot(startOffset.snapshotId());
+ Snapshot nextValidSnapshot;
+ do {
+ nextValidSnapshot = nextValidSnapshot(lastValidSnapshot);
+ if (nextValidSnapshot != null) {
+ lastValidSnapshot = nextValidSnapshot;
+ }
+ } while (nextValidSnapshot != null);
+ return new StreamingOffset(
+ lastValidSnapshot.snapshotId(),
+ MicroBatchUtils.addedFilesCount(table(), lastValidSnapshot),
+ false);
+ }
+
+ return computeLimitedOffset(limit);
+ }
+
+ private StreamingOffset computeLimitedOffset(ReadLimit limit) {
+ UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+ long rowsSeen = 0;
+ long filesSeen = 0;
+ LOG.debug(
+ "latestOffset queue status, queuedFiles: {}, queuedRows: {}",
+ queuedFileCount.get(),
+ queuedRowCount.get());
+
+ List<Pair<StreamingOffset, FileScanTask>> queueSnapshot =
Lists.newArrayList(queue);
+ Pair<StreamingOffset, FileScanTask> queueTail =
+ queueSnapshot.isEmpty() ? null :
queueSnapshot.get(queueSnapshot.size() - 1);
+
+ for (int i = 0; i < queueSnapshot.size(); i++) {
+ Pair<StreamingOffset, FileScanTask> elem = queueSnapshot.get(i);
+ long fileRows = elem.second().file().recordCount();
+
+ // Hard limit on files - stop BEFORE exceeding
+ if (filesSeen + 1 > unpackedLimits.getMaxFiles()) {
+ if (filesSeen == 0) {
+ return null;
+ }
+ LOG.debug(
+ "latestOffset hit file limit at {}, rows: {}, files: {}",
+ elem.first(),
+ rowsSeen,
+ filesSeen);
+ return elem.first();
+ }
+
+ // Soft limit on rows - include file FIRST, then check
+ rowsSeen += fileRows;
+ filesSeen += 1;
+
+ // Check if we've hit the row limit after including this file
+ if (rowsSeen >= unpackedLimits.getMaxRows()) {
+ if (filesSeen == 1 && rowsSeen > unpackedLimits.getMaxRows()) {
+ LOG.warn(
+ "File {} at offset {} contains {} records, exceeding
maxRecordsPerMicroBatch limit of {}. "
+ + "This file will be processed entirely to guarantee forward
progress. "
+ + "Consider increasing the limit or writing smaller files to
avoid unexpected memory usage.",
+ elem.second().file().location(),
+ elem.first(),
+ fileRows,
+ unpackedLimits.getMaxRows());
+ }
+ // Return the offset of the NEXT element (or synthesize tail+1)
+ if (i + 1 < queueSnapshot.size()) {
+ LOG.debug(
+ "latestOffset hit row limit at {}, rows: {}, files: {}",
+ queueSnapshot.get(i + 1).first(),
+ rowsSeen,
+ filesSeen);
+ return queueSnapshot.get(i + 1).first();
+ } else {
+ // This is the last element - return tail+1
+ StreamingOffset current = elem.first();
+ StreamingOffset result =
+ new StreamingOffset(
+ current.snapshotId(), current.position() + 1,
current.shouldScanAllFiles());
+ LOG.debug(
+ "latestOffset hit row limit at tail {}, rows: {}, files: {}",
+ result,
+ rowsSeen,
+ filesSeen);
+ return result;
+ }
+ }
+ }
+
+ // if we got here there aren't enough files to exceed our limits
+ if (queueTail != null) {
+ StreamingOffset tailOffset = queueTail.first();
+ // we have to increment the position by 1 since we want to include the
tail in the read and
+ // position is non-inclusive
+ StreamingOffset latestOffset =
+ new StreamingOffset(
+ tailOffset.snapshotId(), tailOffset.position() + 1,
tailOffset.shouldScanAllFiles());
+ LOG.debug("latestOffset returning all queued data {}", latestOffset);
+ return latestOffset;
+ }
+
+ // if we got here the queue is empty
+ LOG.debug("latestOffset no data, returning null");
+ return null;
+ }
+
+ // Background task wrapper that traps exceptions
+ private void refreshAndTrapException() {
+ try {
+ table().refresh();
+ } catch (Throwable t) {
+ LOG.error("Failed to refresh table {}", table().name(), t);
+ refreshFailedThrowable = t;
+ }
+ }
+
+ // Background task wrapper that traps exceptions
+ private void fillQueueAndTrapException(Snapshot snapshot) {
+ try {
+ fillQueue(snapshot);
+ } catch (Throwable t) {
+ LOG.error("Failed to fill queue for table {}", table().name(), t);
+ fillQueueFailedThrowable = t;
+ }
+ }
+
+ /** Generate a MicroBatch based on input parameters and add to the queue */
+ private void addMicroBatchToQueue(
+ Snapshot snapshot, long startFileIndex, long endFileIndex, boolean
shouldScanAllFile) {
+ LOG.info("Adding MicroBatch for snapshot: {} to the queue",
snapshot.snapshotId());
+ MicroBatches.MicroBatch microBatch =
+ MicroBatches.from(snapshot, table().io())
+ .caseSensitive(readConf().caseSensitive())
+ .specsById(table().specs())
+ .generate(startFileIndex, endFileIndex, Long.MAX_VALUE,
shouldScanAllFile);
+
+ long position = startFileIndex;
+ for (FileScanTask task : microBatch.tasks()) {
+ Pair<StreamingOffset, FileScanTask> elem =
+ Pair.of(new StreamingOffset(microBatch.snapshotId(), position,
shouldScanAllFile), task);
+ queuedFileCount.incrementAndGet();
+ queuedRowCount.addAndGet(task.file().recordCount());
+ queue.addLast(elem);
+ position += 1;
+ }
+ if (LOG.isDebugEnabled()) {
+ StringBuilder sb = new StringBuilder("\n");
+ for (Pair<StreamingOffset, FileScanTask> elem : queue) {
+ sb.append(elem.first()).append("\n");
+ }
+ LOG.debug(sb.toString());
+ }
+ lastQueuedSnapshot = snapshot;
+ }
+
+ private void fillQueue(StreamingOffset fromOffset, StreamingOffset toOffset)
{
+ LOG.debug("filling queue from {}, to: {}", fromOffset, toOffset);
+ Snapshot currentSnapshot = table().snapshot(fromOffset.snapshotId());
+ // this could be a partial snapshot so add it outside the loop
+ if (currentSnapshot != null) {
+ addMicroBatchToQueue(
+ currentSnapshot,
+ fromOffset.position(),
+ MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+ fromOffset.shouldScanAllFiles());
+ }
+ if (toOffset != null) {
+ if (currentSnapshot != null) {
+ while (currentSnapshot.snapshotId() != toOffset.snapshotId()) {
+ currentSnapshot = nextValidSnapshot(currentSnapshot);
+ if (currentSnapshot != null) {
+ addMicroBatchToQueue(
+ currentSnapshot,
+ 0,
+ MicroBatchUtils.addedFilesCount(table(), currentSnapshot),
+ false);
+ } else {
+ break;
+ }
+ }
+ }
+ // toOffset snapshot already added in loop when currentSnapshot ==
toOffset
+ } else {
+ fillQueueInitialBuffer(currentSnapshot);
+ }
+ }
+
+ private void fillQueueInitialBuffer(Snapshot startSnapshot) {
+ // toOffset is null - fill initial buffer to prevent queue starvation
before background
+ // thread starts. Use configured limits to avoid loading all snapshots
+ // (which could cause OOM on tables with thousands of snapshots).
+ long targetRows = readConf().asyncQueuePreloadRowLimit();
+ long targetFiles = readConf().asyncQueuePreloadFileLimit();
+
+ Snapshot preloadEndSnapshot = initialPreloadEndSnapshot();
+ if (preloadEndSnapshot == null) {
+ return; // Empty table
+ }
+
+ // START_OFFSET case: initialize using nextValidSnapshot which respects
timestamp filtering
+ Snapshot current = startSnapshot;
+ if (current == null) {
+ current = nextValidSnapshot(null);
+ if (current != null) {
+ addMicroBatchToQueue(current, 0,
MicroBatchUtils.addedFilesCount(table(), current), false);
+ }
+ }
+
+ // Continue loading more snapshots within safety limits
+ if (current != null) {
+ while ((queuedRowCount.get() < targetRows || queuedFileCount.get() <
targetFiles)
+ && current.snapshotId() != preloadEndSnapshot.snapshotId()) {
+ current = nextValidSnapshot(current);
+ if (current != null) {
+ addMicroBatchToQueue(
+ current, 0, MicroBatchUtils.addedFilesCount(table(), current),
false);
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
+ private Snapshot initialPreloadEndSnapshot() {
+ if (lastOffsetForTriggerAvailableNow != null) {
+ return table().snapshot(lastOffsetForTriggerAvailableNow.snapshotId());
+ }
+
+ return table().currentSnapshot();
+ }
+
+ @VisibleForTesting
+ static boolean reachedAvailableNowCap(
+ Snapshot readFrom, StreamingOffset lastOffsetForTriggerAvailableNow) {
+ return lastOffsetForTriggerAvailableNow != null
+ && readFrom != null
+ && readFrom.snapshotId() ==
lastOffsetForTriggerAvailableNow.snapshotId();
+ }
+
+ /** Try to populate the queue with data from unread snapshots */
+ private void fillQueue(Snapshot readFrom) {
+ // Don't add beyond cap for Trigger.AvailableNow
+ if (reachedAvailableNowCap(readFrom, lastOffsetForTriggerAvailableNow)) {
+ LOG.debug(
+ "Reached cap snapshot {}, not adding more",
+ this.lastOffsetForTriggerAvailableNow.snapshotId());
+ return;
+ }
+
+ if ((queuedRowCount.get() > minQueuedRows) || (queuedFileCount.get() >
minQueuedFiles)) {
+ // we have enough data buffered, check back shortly
+ LOG.debug(
+ "Buffer is full, {} > {} or {} > {}",
+ queuedRowCount.get(),
+ minQueuedRows,
+ queuedFileCount.get(),
+ minQueuedFiles);
+ } else {
+ // add an entire snapshot to the queue
+ Snapshot nextValidSnapshot = nextValidSnapshot(readFrom);
+ if (nextValidSnapshot != null) {
+ addMicroBatchToQueue(
+ nextValidSnapshot,
+ 0,
+ MicroBatchUtils.addedFilesCount(table(), nextValidSnapshot),
+ false);
+ } else {
+ LOG.debug("No snapshots ready to be read");
+ }
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..9298c2bbdf
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/BaseSparkMicroBatchPlanner.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.Locale;
+import org.apache.iceberg.DataOperations;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
+import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+abstract class BaseSparkMicroBatchPlanner implements SparkMicroBatchPlanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseSparkMicroBatchPlanner.class);
+ private final Table table;
+ private final SparkReadConf readConf;
+
+ BaseSparkMicroBatchPlanner(Table table, SparkReadConf readConf) {
+ this.table = table;
+ this.readConf = readConf;
+ }
+
+ protected Table table() {
+ return table;
+ }
+
+ protected SparkReadConf readConf() {
+ return readConf;
+ }
+
+ protected boolean shouldProcess(Snapshot snapshot) {
+ String op = snapshot.operation();
+ switch (op) {
+ case DataOperations.APPEND:
+ return true;
+ case DataOperations.REPLACE:
+ return false;
+ case DataOperations.DELETE:
+ Preconditions.checkState(
+ readConf.streamingSkipDeleteSnapshots(),
+ "Cannot process delete snapshot: %s, to ignore deletes, set
%s=true",
+ snapshot.snapshotId(),
+ SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
+ return false;
+ case DataOperations.OVERWRITE:
+ Preconditions.checkState(
+ readConf.streamingSkipOverwriteSnapshots(),
+ "Cannot process overwrite snapshot: %s, to ignore overwrites, set
%s=true",
+ snapshot.snapshotId(),
+ SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
+ return false;
+ default:
+ throw new IllegalStateException(
+ String.format(
+ "Cannot process unknown snapshot operation: %s (snapshot id
%s)",
+ op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
+ }
+ }
+
+ /**
+ * Get the next snapshot skipping over rewrite and delete snapshots. Async
must handle nulls.
+ *
+ * @param curSnapshot the current snapshot
+ * @return the next valid snapshot (not a rewrite or delete snapshot),
returns null if all
+ * remaining snapshots should be skipped.
+ */
+ protected Snapshot nextValidSnapshot(Snapshot curSnapshot) {
+ Snapshot nextSnapshot;
+ // if there were no valid snapshots, check for an initialOffset again
+ if (curSnapshot == null) {
+ StreamingOffset startingOffset =
+ MicroBatchUtils.determineStartingOffset(table,
readConf.streamFromTimestamp());
+ LOG.debug("determineStartingOffset picked startingOffset: {}",
startingOffset);
+ if (StreamingOffset.START_OFFSET.equals(startingOffset)) {
+ return null;
+ }
+ nextSnapshot = table.snapshot(startingOffset.snapshotId());
+ } else {
+ if (curSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ return null;
+ }
+ nextSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
+ }
+ // skip over rewrite and delete snapshots
+ while (!shouldProcess(nextSnapshot)) {
+ LOG.debug("Skipping snapshot: {}", nextSnapshot);
+ // if the currentSnapShot was also the mostRecentSnapshot then break
+ // avoids snapshotAfter throwing exception since there are no more
snapshots to process
+ if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
+ return null;
+ }
+ nextSnapshot = SnapshotUtil.snapshotAfter(table,
nextSnapshot.snapshotId());
+ }
+ return nextSnapshot;
+ }
+
+ static class UnpackedLimits {
+ private long maxRows = Integer.MAX_VALUE;
+ private long maxFiles = Integer.MAX_VALUE;
+
+ UnpackedLimits(ReadLimit limit) {
+ if (limit instanceof CompositeReadLimit) {
+ ReadLimit[] compositeLimits = ((CompositeReadLimit)
limit).getReadLimits();
+ for (ReadLimit individualLimit : compositeLimits) {
+ if (individualLimit instanceof ReadMaxRows) {
+ ReadMaxRows readMaxRows = (ReadMaxRows) individualLimit;
+ this.maxRows = Math.min(this.maxRows, readMaxRows.maxRows());
+ } else if (individualLimit instanceof ReadMaxFiles) {
+ ReadMaxFiles readMaxFiles = (ReadMaxFiles) individualLimit;
+ this.maxFiles = Math.min(this.maxFiles, readMaxFiles.maxFiles());
+ }
+ }
+ } else if (limit instanceof ReadMaxRows) {
+ this.maxRows = ((ReadMaxRows) limit).maxRows();
+ } else if (limit instanceof ReadMaxFiles) {
+ this.maxFiles = ((ReadMaxFiles) limit).maxFiles();
+ }
+ }
+
+ public long getMaxRows() {
+ return maxRows;
+ }
+
+ public long getMaxFiles() {
+ return maxFiles;
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
new file mode 100644
index 0000000000..7c73e3f416
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/MicroBatchUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotChanges;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+
+class MicroBatchUtils {
+
+ private MicroBatchUtils() {}
+
+ static StreamingOffset determineStartingOffset(Table table, long
fromTimestamp) {
+ if (table.currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (fromTimestamp == Long.MIN_VALUE) {
+ // start from the oldest snapshot, since default value is MIN_VALUE
+ // avoids looping to find first snapshot
+ return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+ }
+
+ if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ try {
+ Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table,
fromTimestamp);
+ if (snapshot != null) {
+ return new StreamingOffset(snapshot.snapshotId(), 0, false);
+ } else {
+ return StreamingOffset.START_OFFSET;
+ }
+ } catch (IllegalStateException e) {
+ // could not determine the first snapshot after the timestamp. use the
oldest ancestor instead
+ return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+ }
+ }
+
+ static long addedFilesCount(Table table, Snapshot snapshot) {
+ long addedFilesCount =
+ PropertyUtil.propertyAsLong(snapshot.summary(),
SnapshotSummary.ADDED_FILES_PROP, -1);
+ return addedFilesCount == -1
+ ? Iterables.size(
+
SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDataFiles())
+ : addedFilesCount;
+ }
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..1986ddac5d
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchPlanner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.util.List;
+import org.apache.iceberg.FileScanTask;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+
+interface SparkMicroBatchPlanner {
+ /**
+ * Return the {@link FileScanTask}s for data added between the start and end
offsets.
+ *
+ * @param startOffset the offset to start planning from
+ * @param endOffset the offset to plan up to
+ * @return file scan tasks for data in the offset range
+ */
+ List<FileScanTask> planFiles(StreamingOffset startOffset, StreamingOffset
endOffset);
+
+ /**
+ * Return the latest offset the stream can advance to from {@code
startOffset}, respecting the
+ * given {@link ReadLimit}.
+ *
+ * @param startOffset the current offset of the stream
+ * @param limit the read limit bounding how far ahead to advance
+ * @return the latest available offset, or {@code null} if no new data is
available
+ */
+ StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit limit);
+
+ /** Stop the planner and release any resources. */
+ void stop();
+}
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
index 06bc16bba8..a1ff767fe2 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java
@@ -26,48 +26,32 @@ import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
-import java.util.Locale;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataOperations;
import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.MicroBatches;
-import org.apache.iceberg.MicroBatches.MicroBatch;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.SnapshotChanges;
-import org.apache.iceberg.SnapshotSummary;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.SparkReadConf;
-import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.PropertyUtil;
-import org.apache.iceberg.util.SnapshotUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
-import org.apache.spark.sql.connector.read.streaming.CompositeReadLimit;
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.ReadLimit;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxFiles;
-import org.apache.spark.sql.connector.read.streaming.ReadMaxRows;
import
org.apache.spark.sql.connector.read.streaming.SupportsTriggerAvailableNow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,6 +63,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
private final Table table;
private final Supplier<FileIO> fileIO;
+ private final SparkReadConf readConf;
private final String branch;
private final boolean caseSensitive;
private final String expectedSchema;
@@ -89,12 +74,11 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
private final long splitOpenFileCost;
private final boolean localityPreferred;
private final StreamingOffset initialOffset;
- private final boolean skipDelete;
- private final boolean skipOverwrite;
private final long fromTimestamp;
private final int maxFilesPerMicroBatch;
private final int maxRecordsPerMicroBatch;
private final boolean cacheDeleteFilesOnExecutors;
+ private SparkMicroBatchPlanner planner;
private StreamingOffset lastOffsetForTriggerAvailableNow;
SparkMicroBatchStream(
@@ -106,6 +90,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
String checkpointLocation) {
this.table = table;
this.fileIO = fileIO;
+ this.readConf = readConf;
this.branch = readConf.branch();
this.caseSensitive = readConf.caseSensitive();
this.expectedSchema = SchemaParser.toJson(expectedSchema);
@@ -124,9 +109,6 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
new InitialOffsetStore(
table, checkpointLocation, fromTimestamp,
sparkContext.hadoopConfiguration());
this.initialOffset = initialOffsetStore.initialOffset();
-
- this.skipDelete = readConf.streamingSkipDeleteSnapshots();
- this.skipOverwrite = readConf.streamingSkipOverwriteSnapshots();
}
@Override
@@ -141,8 +123,8 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
}
Snapshot latestSnapshot = table.currentSnapshot();
-
- return new StreamingOffset(latestSnapshot.snapshotId(),
addedFilesCount(latestSnapshot), false);
+ return new StreamingOffset(
+ latestSnapshot.snapshotId(), MicroBatchUtils.addedFilesCount(table,
latestSnapshot), false);
}
@Override
@@ -161,7 +143,11 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
StreamingOffset endOffset = (StreamingOffset) end;
StreamingOffset startOffset = (StreamingOffset) start;
- List<FileScanTask> fileScanTasks = planFiles(startOffset, endOffset);
+ if (planner == null) {
+ initializePlanner(startOffset, endOffset);
+ }
+
+ List<FileScanTask> fileScanTasks = planner.planFiles(startOffset,
endOffset);
CloseableIterable<FileScanTask> splitTasks =
TableScanUtil.splitFiles(CloseableIterable.withNoopClose(fileScanTasks),
splitSize);
@@ -171,7 +157,6 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
String[][] locations = computePreferredLocations(combinedScanTasks);
InputPartition[] partitions = new InputPartition[combinedScanTasks.size()];
-
for (int index = 0; index < combinedScanTasks.size(); index++) {
partitions[index] =
new SparkInputPartition(
@@ -214,318 +199,35 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
public void commit(Offset end) {}
@Override
- public void stop() {}
-
- private List<FileScanTask> planFiles(StreamingOffset startOffset,
StreamingOffset endOffset) {
- List<FileScanTask> fileScanTasks = Lists.newArrayList();
- StreamingOffset batchStartOffset =
- StreamingOffset.START_OFFSET.equals(startOffset)
- ? determineStartingOffset(table, fromTimestamp)
- : startOffset;
-
- StreamingOffset currentOffset = null;
-
- // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
- do {
- long endFileIndex;
- if (currentOffset == null) {
- currentOffset = batchStartOffset;
- } else {
- Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table,
currentOffset.snapshotId());
- // it may happen that we need to read this snapshot partially in case
it's equal to
- // endOffset.
- if (currentOffset.snapshotId() != endOffset.snapshotId()) {
- currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L,
false);
- } else {
- currentOffset = endOffset;
- }
- }
-
- Snapshot snapshot = table.snapshot(currentOffset.snapshotId());
-
- validateCurrentSnapshotExists(snapshot, currentOffset);
-
- if (!shouldProcess(snapshot)) {
- LOG.debug("Skipping snapshot: {} of table {}",
currentOffset.snapshotId(), table.name());
- continue;
- }
-
- Snapshot currentSnapshot = table.snapshot(currentOffset.snapshotId());
- if (currentOffset.snapshotId() == endOffset.snapshotId()) {
- endFileIndex = endOffset.position();
- } else {
- endFileIndex = addedFilesCount(currentSnapshot);
- }
-
- MicroBatch latestMicroBatch =
- MicroBatches.from(currentSnapshot, table.io())
- .caseSensitive(caseSensitive)
- .specsById(table.specs())
- .generate(
- currentOffset.position(),
- endFileIndex,
- Long.MAX_VALUE,
- currentOffset.shouldScanAllFiles());
-
- fileScanTasks.addAll(latestMicroBatch.tasks());
- } while (currentOffset.snapshotId() != endOffset.snapshotId());
-
- return fileScanTasks;
- }
-
- private boolean shouldProcess(Snapshot snapshot) {
- String op = snapshot.operation();
- switch (op) {
- case DataOperations.APPEND:
- return true;
- case DataOperations.REPLACE:
- return false;
- case DataOperations.DELETE:
- Preconditions.checkState(
- skipDelete,
- "Cannot process delete snapshot: %s, to ignore deletes, set
%s=true",
- snapshot.snapshotId(),
- SparkReadOptions.STREAMING_SKIP_DELETE_SNAPSHOTS);
- return false;
- case DataOperations.OVERWRITE:
- Preconditions.checkState(
- skipOverwrite,
- "Cannot process overwrite snapshot: %s, to ignore overwrites, set
%s=true",
- snapshot.snapshotId(),
- SparkReadOptions.STREAMING_SKIP_OVERWRITE_SNAPSHOTS);
- return false;
- default:
- throw new IllegalStateException(
- String.format(
- "Cannot process unknown snapshot operation: %s (snapshot id
%s)",
- op.toLowerCase(Locale.ROOT), snapshot.snapshotId()));
- }
- }
-
- private static StreamingOffset determineStartingOffset(Table table, Long
fromTimestamp) {
- if (table.currentSnapshot() == null) {
- return StreamingOffset.START_OFFSET;
- }
-
- if (fromTimestamp == null) {
- // match existing behavior and start from the oldest snapshot
- return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
- }
-
- if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
- return StreamingOffset.START_OFFSET;
- }
-
- try {
- Snapshot snapshot = SnapshotUtil.oldestAncestorAfter(table,
fromTimestamp);
- if (snapshot != null) {
- return new StreamingOffset(snapshot.snapshotId(), 0, false);
- } else {
- return StreamingOffset.START_OFFSET;
- }
- } catch (IllegalStateException e) {
- // could not determine the first snapshot after the timestamp. use the
oldest ancestor instead
- return new
StreamingOffset(SnapshotUtil.oldestAncestor(table).snapshotId(), 0, false);
+ public void stop() {
+ if (planner != null) {
+ planner.stop();
}
}
- private static int getMaxFiles(ReadLimit readLimit) {
- if (readLimit instanceof ReadMaxFiles) {
- return ((ReadMaxFiles) readLimit).maxFiles();
- }
-
- if (readLimit instanceof CompositeReadLimit) {
- // We do not expect a CompositeReadLimit to contain a nested
CompositeReadLimit.
- // In fact, it should only be a composite of two or more of ReadMinRows,
ReadMaxRows and
- // ReadMaxFiles, with no more than one of each.
- ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
- for (ReadLimit limit : limits) {
- if (limit instanceof ReadMaxFiles) {
- return ((ReadMaxFiles) limit).maxFiles();
- }
- }
- }
-
- // there is no ReadMaxFiles, so return the default
- return Integer.MAX_VALUE;
- }
-
- private static int getMaxRows(ReadLimit readLimit) {
- if (readLimit instanceof ReadMaxRows) {
- long maxRows = ((ReadMaxRows) readLimit).maxRows();
- return Math.toIntExact(maxRows);
- }
-
- if (readLimit instanceof CompositeReadLimit) {
- ReadLimit[] limits = ((CompositeReadLimit) readLimit).getReadLimits();
- for (ReadLimit limit : limits) {
- if (limit instanceof ReadMaxRows) {
- long maxRows = ((ReadMaxRows) limit).maxRows();
- return Math.toIntExact(maxRows);
- }
- }
+ private void initializePlanner(StreamingOffset startOffset, StreamingOffset
endOffset) {
+ if (readConf.asyncMicroBatchPlanningEnabled()) {
+ this.planner =
+ new AsyncSparkMicroBatchPlanner(
+ table, readConf, startOffset, endOffset,
lastOffsetForTriggerAvailableNow);
+ } else {
+ this.planner =
+ new SyncSparkMicroBatchPlanner(table, readConf,
lastOffsetForTriggerAvailableNow);
}
-
- // There is no ReadMaxRows, so return the default
- return Integer.MAX_VALUE;
}
@Override
- @SuppressWarnings("checkstyle:CyclomaticComplexity")
public Offset latestOffset(Offset startOffset, ReadLimit limit) {
- // calculate end offset get snapshotId from the startOffset
Preconditions.checkArgument(
startOffset instanceof StreamingOffset,
"Invalid start offset: %s is not a StreamingOffset",
startOffset);
- table.refresh();
- if (table.currentSnapshot() == null) {
- return StreamingOffset.START_OFFSET;
- }
-
- if (table.currentSnapshot().timestampMillis() < fromTimestamp) {
- return StreamingOffset.START_OFFSET;
+ if (planner == null) {
+ initializePlanner((StreamingOffset) startOffset, null);
}
- // end offset can expand to multiple snapshots
- StreamingOffset startingOffset = (StreamingOffset) startOffset;
-
- if (startOffset.equals(StreamingOffset.START_OFFSET)) {
- startingOffset = determineStartingOffset(table, fromTimestamp);
- }
-
- Snapshot curSnapshot = table.snapshot(startingOffset.snapshotId());
- validateCurrentSnapshotExists(curSnapshot, startingOffset);
-
- // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
- long latestSnapshotId =
- lastOffsetForTriggerAvailableNow != null
- ? lastOffsetForTriggerAvailableNow.snapshotId()
- : table.currentSnapshot().snapshotId();
-
- int startPosOfSnapOffset = (int) startingOffset.position();
-
- boolean scanAllFiles = startingOffset.shouldScanAllFiles();
-
- boolean shouldContinueReading = true;
- int curFilesAdded = 0;
- long curRecordCount = 0;
- long curPos = 0;
-
- // Note : we produce nextOffset with pos as non-inclusive
- while (shouldContinueReading) {
- // generate manifest index for the curSnapshot
- List<Pair<ManifestFile, Integer>> indexedManifests =
- MicroBatches.skippedManifestIndexesFromSnapshot(
- table.io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
- // this is under assumption we will be able to add at-least 1 file in
the new offset
- for (int idx = 0; idx < indexedManifests.size() &&
shouldContinueReading; idx++) {
- // be rest assured curPos >= startFileIndex
- curPos = indexedManifests.get(idx).second();
- try (CloseableIterable<FileScanTask> taskIterable =
- MicroBatches.openManifestFile(
- table.io(),
- table.specs(),
- caseSensitive,
- curSnapshot,
- indexedManifests.get(idx).first(),
- scanAllFiles);
- CloseableIterator<FileScanTask> taskIter =
taskIterable.iterator()) {
- while (taskIter.hasNext()) {
- FileScanTask task = taskIter.next();
- if (curPos >= startPosOfSnapOffset) {
- if ((curFilesAdded + 1) > getMaxFiles(limit)) {
- // On including the file it might happen that we might exceed,
the configured
- // soft limit on the number of records, since this is a soft
limit its acceptable.
- shouldContinueReading = false;
- break;
- }
-
- curFilesAdded += 1;
- curRecordCount += task.file().recordCount();
-
- if (curRecordCount >= getMaxRows(limit)) {
- // we included the file, so increment the number of files
- // read in the current snapshot.
- ++curPos;
- shouldContinueReading = false;
- break;
- }
- }
- ++curPos;
- }
- } catch (IOException ioe) {
- LOG.warn("Failed to close task iterable", ioe);
- }
- }
- // if the currentSnapShot was also the latestSnapshot then break
- if (curSnapshot.snapshotId() == latestSnapshotId) {
- break;
- }
-
- // if everything was OK and we consumed complete snapshot then move to
next snapshot
- if (shouldContinueReading) {
- Snapshot nextValid = nextValidSnapshot(curSnapshot);
- if (nextValid == null) {
- // nextValid implies all the remaining snapshots should be skipped.
- break;
- }
- // we found the next available snapshot, continue from there.
- curSnapshot = nextValid;
- startPosOfSnapOffset = -1;
- // if anyhow we are moving to next snapshot we should only scan
addedFiles
- scanAllFiles = false;
- }
- }
-
- StreamingOffset latestStreamingOffset =
- new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
-
- // if no new data arrived, then return null.
- return latestStreamingOffset.equals(startingOffset) ? null :
latestStreamingOffset;
- }
-
- /**
- * Get the next snapshot skiping over rewrite and delete snapshots.
- *
- * @param curSnapshot the current snapshot
- * @return the next valid snapshot (not a rewrite or delete snapshot),
returns null if all
- * remaining snapshots should be skipped.
- */
- private Snapshot nextValidSnapshot(Snapshot curSnapshot) {
- Snapshot nextSnapshot = SnapshotUtil.snapshotAfter(table,
curSnapshot.snapshotId());
- // skip over rewrite and delete snapshots
- while (!shouldProcess(nextSnapshot)) {
- LOG.debug("Skipping snapshot: {} of table {}",
nextSnapshot.snapshotId(), table.name());
- // if the currentSnapShot was also the mostRecentSnapshot then break
- if (nextSnapshot.snapshotId() == table.currentSnapshot().snapshotId()) {
- return null;
- }
- nextSnapshot = SnapshotUtil.snapshotAfter(table,
nextSnapshot.snapshotId());
- }
- return nextSnapshot;
- }
-
- private long addedFilesCount(Snapshot snapshot) {
- long addedFilesCount =
- PropertyUtil.propertyAsLong(snapshot.summary(),
SnapshotSummary.ADDED_FILES_PROP, -1);
- // If snapshotSummary doesn't have SnapshotSummary.ADDED_FILES_PROP,
- // iterate through addedFiles iterator to find addedFilesCount.
- return addedFilesCount == -1
- ? Iterables.size(
-
SnapshotChanges.builderFor(table).snapshot(snapshot).build().addedDataFiles())
- : addedFilesCount;
- }
-
- private void validateCurrentSnapshotExists(Snapshot snapshot,
StreamingOffset currentOffset) {
- if (snapshot == null) {
- throw new IllegalStateException(
- String.format(
- Locale.ROOT,
- "Cannot load current offset at snapshot %d, the snapshot was
expired or removed",
- currentOffset.snapshotId()));
- }
+ return planner.latestOffset((StreamingOffset) startOffset, limit);
}
@Override
@@ -553,6 +255,11 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
(StreamingOffset) latestOffset(initialOffset,
ReadLimit.allAvailable());
LOG.info("lastOffset for Trigger.AvailableNow is {}",
lastOffsetForTriggerAvailableNow.json());
+
+ if (planner != null) {
+ planner.stop();
+ planner = null;
+ }
}
private static class InitialOffsetStore {
@@ -576,7 +283,7 @@ public class SparkMicroBatchStream implements
MicroBatchStream, SupportsTriggerA
}
table.refresh();
- StreamingOffset offset = determineStartingOffset(table, fromTimestamp);
+ StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table,
fromTimestamp);
OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
writeOffset(offset, outputFile);
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..f1b0029c54
--- /dev/null
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SyncSparkMicroBatchPlanner.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.MicroBatches;
+import org.apache.iceberg.MicroBatches.MicroBatch;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkReadConf;
+import org.apache.iceberg.util.Pair;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class SyncSparkMicroBatchPlanner extends BaseSparkMicroBatchPlanner {
+ private static final Logger LOG =
LoggerFactory.getLogger(SyncSparkMicroBatchPlanner.class);
+
+ private final boolean caseSensitive;
+ private final long fromTimestamp;
+ private final StreamingOffset lastOffsetForTriggerAvailableNow;
+
+ SyncSparkMicroBatchPlanner(
+ Table table, SparkReadConf readConf, StreamingOffset
lastOffsetForTriggerAvailableNow) {
+ super(table, readConf);
+ this.caseSensitive = readConf().caseSensitive();
+ this.fromTimestamp = readConf().streamFromTimestamp();
+ this.lastOffsetForTriggerAvailableNow = lastOffsetForTriggerAvailableNow;
+ }
+
+ @Override
+ public List<FileScanTask> planFiles(StreamingOffset startOffset,
StreamingOffset endOffset) {
+ List<FileScanTask> fileScanTasks = Lists.newArrayList();
+ StreamingOffset batchStartOffset =
+ StreamingOffset.START_OFFSET.equals(startOffset)
+ ? MicroBatchUtils.determineStartingOffset(table(), fromTimestamp)
+ : startOffset;
+
+ StreamingOffset currentOffset = null;
+
+ // [(startOffset : startFileIndex), (endOffset : endFileIndex) )
+ do {
+ long endFileIndex;
+ if (currentOffset == null) {
+ currentOffset = batchStartOffset;
+ } else {
+ Snapshot snapshotAfter = SnapshotUtil.snapshotAfter(table(),
currentOffset.snapshotId());
+ // it may happen that we need to read this snapshot partially in case
it's equal to
+ // endOffset.
+ if (currentOffset.snapshotId() != endOffset.snapshotId()) {
+ currentOffset = new StreamingOffset(snapshotAfter.snapshotId(), 0L,
false);
+ } else {
+ currentOffset = endOffset;
+ }
+ }
+
+ Snapshot snapshot = table().snapshot(currentOffset.snapshotId());
+
+ validateCurrentSnapshotExists(snapshot, currentOffset);
+
+ if (!shouldProcess(snapshot)) {
+ LOG.debug("Skipping snapshot: {} of table {}",
currentOffset.snapshotId(), table().name());
+ continue;
+ }
+
+ Snapshot currentSnapshot = table().snapshot(currentOffset.snapshotId());
+ if (currentOffset.snapshotId() == endOffset.snapshotId()) {
+ endFileIndex = endOffset.position();
+ } else {
+ endFileIndex = MicroBatchUtils.addedFilesCount(table(),
currentSnapshot);
+ }
+
+ MicroBatch latestMicroBatch =
+ MicroBatches.from(currentSnapshot, table().io())
+ .caseSensitive(caseSensitive)
+ .specsById(table().specs())
+ .generate(
+ currentOffset.position(),
+ endFileIndex,
+ Long.MAX_VALUE,
+ currentOffset.shouldScanAllFiles());
+
+ fileScanTasks.addAll(latestMicroBatch.tasks());
+ } while (currentOffset.snapshotId() != endOffset.snapshotId());
+
+ return fileScanTasks;
+ }
+
+ @Override
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ public StreamingOffset latestOffset(StreamingOffset startOffset, ReadLimit
limit) {
+ table().refresh();
+ if (table().currentSnapshot() == null) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ if (table().currentSnapshot().timestampMillis() < fromTimestamp) {
+ return StreamingOffset.START_OFFSET;
+ }
+
+ // end offset can expand to multiple snapshots
+ StreamingOffset startingOffset = startOffset;
+
+ if (startOffset.equals(StreamingOffset.START_OFFSET)) {
+ startingOffset = MicroBatchUtils.determineStartingOffset(table(),
fromTimestamp);
+ }
+
+ Snapshot curSnapshot = table().snapshot(startingOffset.snapshotId());
+ validateCurrentSnapshotExists(curSnapshot, startingOffset);
+
+ // Use the pre-computed snapshotId when Trigger.AvailableNow is enabled.
+ long latestSnapshotId =
+ lastOffsetForTriggerAvailableNow != null
+ ? lastOffsetForTriggerAvailableNow.snapshotId()
+ : table().currentSnapshot().snapshotId();
+
+ int startPosOfSnapOffset = (int) startingOffset.position();
+
+ boolean scanAllFiles = startingOffset.shouldScanAllFiles();
+
+ boolean shouldContinueReading = true;
+ int curFilesAdded = 0;
+ long curRecordCount = 0;
+ int curPos = 0;
+
+ // Extract limits once to avoid repeated calls in tight loop
+ UnpackedLimits unpackedLimits = new UnpackedLimits(limit);
+ long maxFiles = unpackedLimits.getMaxFiles();
+ long maxRows = unpackedLimits.getMaxRows();
+
+ // Note : we produce nextOffset with pos as non-inclusive
+ while (shouldContinueReading) {
+ // generate manifest index for the curSnapshot
+ List<Pair<ManifestFile, Integer>> indexedManifests =
+ MicroBatches.skippedManifestIndexesFromSnapshot(
+ table().io(), curSnapshot, startPosOfSnapOffset, scanAllFiles);
+ // this is under assumption we will be able to add at-least 1 file in
the new offset
+ for (int idx = 0; idx < indexedManifests.size() &&
shouldContinueReading; idx++) {
+ // be rest assured curPos >= startFileIndex
+ curPos = indexedManifests.get(idx).second();
+ try (CloseableIterable<FileScanTask> taskIterable =
+ MicroBatches.openManifestFile(
+ table().io(),
+ table().specs(),
+ caseSensitive,
+ curSnapshot,
+ indexedManifests.get(idx).first(),
+ scanAllFiles);
+ CloseableIterator<FileScanTask> taskIter =
taskIterable.iterator()) {
+ while (taskIter.hasNext()) {
+ FileScanTask task = taskIter.next();
+ if (curPos >= startPosOfSnapOffset) {
+ if ((curFilesAdded + 1) > maxFiles) {
+ // On including the file it might happen that we might exceed,
the configured
+ // soft limit on the number of records, since this is a soft
limit its acceptable.
+ shouldContinueReading = false;
+ break;
+ }
+
+ curFilesAdded += 1;
+ curRecordCount += task.file().recordCount();
+
+ if (curRecordCount >= maxRows) {
+ // we included the file, so increment the number of files
+ // read in the current snapshot.
+ if (curFilesAdded == 1 && curRecordCount > maxRows) {
+ LOG.warn(
+ "File {} contains {} records, exceeding
maxRecordsPerMicroBatch limit of {}. "
+ + "This file will be processed entirely to guarantee
forward progress. "
+ + "Consider increasing the limit or writing smaller
files to avoid unexpected memory usage.",
+ task.file().location(),
+ task.file().recordCount(),
+ maxRows);
+ }
+ ++curPos;
+ shouldContinueReading = false;
+ break;
+ }
+ }
+ ++curPos;
+ }
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close task iterable", ioe);
+ }
+ }
+ // if the currentSnapShot was also the latestSnapshot then break
+ if (curSnapshot.snapshotId() == latestSnapshotId) {
+ break;
+ }
+
+ // if everything was OK and we consumed complete snapshot then move to
next snapshot
+ if (shouldContinueReading) {
+ Snapshot nextValid = nextValidSnapshot(curSnapshot);
+ if (nextValid == null) {
+ // nextValid implies all the remaining snapshots should be skipped.
+ break;
+ }
+ // we found the next available snapshot, continue from there.
+ curSnapshot = nextValid;
+ startPosOfSnapOffset = -1;
+ // if anyhow we are moving to next snapshot we should only scan
addedFiles
+ scanAllFiles = false;
+ }
+ }
+
+ StreamingOffset latestStreamingOffset =
+ new StreamingOffset(curSnapshot.snapshotId(), curPos, scanAllFiles);
+
+ // if no new data arrived, then return null.
+ return latestStreamingOffset.equals(startingOffset) ? null :
latestStreamingOffset;
+ }
+
+ @Override
+ public void stop() {}
+
+ private void validateCurrentSnapshotExists(Snapshot snapshot,
StreamingOffset currentOffset) {
+ if (snapshot == null) {
+ throw new IllegalStateException(
+ String.format(
+ Locale.ROOT,
+ "Cannot load current offset at snapshot %d, the snapshot was
expired or removed",
+ currentOffset.snapshotId()));
+ }
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java
new file mode 100644
index 0000000000..b6017e2001
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestAsyncSparkMicroBatchPlanner.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.iceberg.Snapshot;
+import org.junit.jupiter.api.Test;
+
+class TestAsyncSparkMicroBatchPlanner {
+
+ @Test
+ void reachedAvailableNowCapReturnsTrueOnlyForExactCapSnapshot() {
+ Snapshot capSnapshot = mockSnapshot(10L);
+ Snapshot laterSnapshotWithHigherId = mockSnapshot(20L);
+ Snapshot laterSnapshotWithLowerId = mockSnapshot(5L);
+ StreamingOffset capOffset = new StreamingOffset(10L, 3L, false);
+
+ assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(capSnapshot,
capOffset)).isTrue();
+ assertThat(
+ AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(
+ laterSnapshotWithHigherId, capOffset))
+ .isFalse();
+ assertThat(
+
AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(laterSnapshotWithLowerId,
capOffset))
+ .isFalse();
+ }
+
+ @Test
+ void reachedAvailableNowCapReturnsFalseWhenCapOrSnapshotIsMissing() {
+ Snapshot readFrom = mockSnapshot(10L);
+ StreamingOffset capOffset = new StreamingOffset(10L, 1L, false);
+
+ assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(readFrom,
null)).isFalse();
+ assertThat(AsyncSparkMicroBatchPlanner.reachedAvailableNowCap(null,
capOffset)).isFalse();
+ }
+
+ private Snapshot mockSnapshot(long snapshotId) {
+ Snapshot snapshot = mock(Snapshot.class);
+ when(snapshot.snapshotId()).thenReturn(snapshotId);
+ return snapshot;
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
new file mode 100644
index 0000000000..a9ce340fd4
--- /dev/null
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestMicroBatchPlanningUtils.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.source;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.SnapshotSummary;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.spark.sql.connector.read.streaming.ReadLimit;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(ParameterizedTestExtension.class)
+public class TestMicroBatchPlanningUtils extends CatalogTestBase {
+
+ private Table table;
+
+ @BeforeEach
+ public void setupTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ sql(
+ "CREATE TABLE %s "
+ + "(id INT, data STRING) "
+ + "USING iceberg "
+ + "PARTITIONED BY (bucket(3, id))",
+ tableName);
+ this.table = validationCatalog.loadTable(tableIdent);
+ }
+
+ @AfterEach
+ public void dropTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @TestTemplate
+ public void testUnpackedLimitsCompositeChoosesMinimum() {
+ ReadLimit[] limits =
+ new ReadLimit[] {
+ ReadLimit.maxRows(10), ReadLimit.maxRows(4), ReadLimit.maxFiles(8),
ReadLimit.maxFiles(2)
+ };
+
+ ReadLimit composite = ReadLimit.compositeLimit(limits);
+
+ BaseSparkMicroBatchPlanner.UnpackedLimits unpacked =
+ new BaseSparkMicroBatchPlanner.UnpackedLimits(composite);
+
+ assertThat(unpacked.getMaxRows()).isEqualTo(4);
+ assertThat(unpacked.getMaxFiles()).isEqualTo(2);
+ }
+
+ @TestTemplate
+ public void testDetermineStartingOffsetWithTimestampBetweenSnapshots() {
+ sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+ table.refresh();
+ long snapshot1Time = table.currentSnapshot().timestampMillis();
+
+ sql("INSERT INTO %s VALUES (2, 'two')", tableName);
+ table.refresh();
+ long snapshot2Id = table.currentSnapshot().snapshotId();
+
+ StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table,
snapshot1Time + 1);
+
+ assertThat(offset.snapshotId()).isEqualTo(snapshot2Id);
+ assertThat(offset.position()).isEqualTo(0L);
+ assertThat(offset.shouldScanAllFiles()).isFalse();
+ }
+
+ @TestTemplate
+ public void testAddedFilesCountUsesSummaryWhenPresent() {
+ sql("INSERT INTO %s VALUES (1, 'one')", tableName);
+ table.refresh();
+
+ long expectedAddedFiles =
+
Long.parseLong(table.currentSnapshot().summary().get(SnapshotSummary.ADDED_FILES_PROP));
+
+ long actual = MicroBatchUtils.addedFilesCount(table,
table.currentSnapshot());
+
+ assertThat(actual).isEqualTo(expectedAddedFiles);
+ }
+}
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
index 2f73d51f04..2b2be95c5c 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestStructuredStreamingRead3.java
@@ -31,13 +31,17 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DataOperations;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
+import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -50,15 +54,22 @@ import org.apache.iceberg.data.FileHelpers;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.CatalogTestBase;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.SparkReadConf;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
@@ -73,10 +84,73 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith(ParameterizedTestExtension.class)
public final class TestStructuredStreamingRead3 extends CatalogTestBase {
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2},
async = {3}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ SparkCatalogConfig.HIVE.properties(),
+ false
+ },
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ SparkCatalogConfig.HIVE.properties(),
+ true
+ },
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties(),
+ false
+ },
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ SparkCatalogConfig.HADOOP.properties(),
+ true
+ },
+ {
+ SparkCatalogConfig.REST.catalogName(),
+ SparkCatalogConfig.REST.implementation(),
+ ImmutableMap.builder()
+ .putAll(SparkCatalogConfig.REST.properties())
+ .put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
+ .build(),
+ false
+ },
+ {
+ SparkCatalogConfig.REST.catalogName(),
+ SparkCatalogConfig.REST.implementation(),
+ ImmutableMap.builder()
+ .putAll(SparkCatalogConfig.REST.properties())
+ .put(CatalogProperties.URI,
restCatalog.properties().get(CatalogProperties.URI))
+ .build(),
+ true
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ false
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ SparkCatalogConfig.SPARK.properties(),
+ true
+ }
+ };
+ }
+
private Table table;
private final AtomicInteger microBatches = new AtomicInteger();
+ @Parameter(index = 3)
+ private Boolean async;
+
/**
* test data to be used by multiple writes each write creates a snapshot and
writes a list of
* records
@@ -158,7 +232,6 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
@TestTemplate
public void testReadStreamWithMaxFiles1() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
List.of(1L, 2L, 1L, 1L, 1L, 1L));
@@ -172,7 +245,6 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
@TestTemplate
public void testReadStreamWithMaxFiles2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"2"),
List.of(3L, 2L, 2L));
@@ -206,7 +278,6 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
@TestTemplate
public void testReadStreamWithMaxRows2() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"2"),
List.of(3L, 2L, 2L));
@@ -227,7 +298,6 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
@TestTemplate
public void testReadStreamWithMaxRows4() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"), List.of(4L, 3L));
@@ -240,13 +310,10 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
@TestTemplate
public void testReadStreamWithCompositeReadLimit() throws Exception {
appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
-
assertMicroBatchRecordSizes(
ImmutableMap.of(
- SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
- "4",
- SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
- "1"),
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1",
+ SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH, "2"),
List.of(1L, 2L, 1L, 1L, 1L, 1L));
assertMicroBatchRecordSizes(
@@ -257,15 +324,41 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
Trigger.AvailableNow());
}
+ @TestTemplate
+ public void testReadStreamWithLowAsyncQueuePreload() throws Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+ // Set low preload limits to test async queue behavior - background thread
should load
+ // remaining data
+
+ StreamingQuery query =
+ startStream(
+ ImmutableMap.of(
+ SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT,
+ "5",
+ SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT,
+ "5"));
+
+ List<SimpleRecord> actual = rowsAvailable(query);
+ assertThat(actual)
+
.containsExactlyInAnyOrderElementsOf(Iterables.concat(TEST_DATA_MULTIPLE_SNAPSHOTS));
+ }
+
@TestTemplate
public void testAvailableNowStreamReadShouldNotHangOrReprocessData() throws
Exception {
File writerCheckpointFolder =
temp.resolve("writer-checkpoint-folder").toFile();
File writerCheckpoint = new File(writerCheckpointFolder,
"writer-checkpoint");
File output = temp.resolve("junit").toFile();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+ options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
+
DataStreamWriter querySource =
spark
.readStream()
+ .options(options)
.format("iceberg")
.load(tableName)
.writeStream()
@@ -320,10 +413,17 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
long expectedSnapshotId = table.currentSnapshot().snapshotId();
String sinkTable = "availablenow_sink";
+ Map<String, String> options = Maps.newHashMap();
+ options.put(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1");
+ options.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+ options.put(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
+
StreamingQuery query =
spark
.readStream()
- .option(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1")
+ .options(options)
.format("iceberg")
.load(tableName)
.writeStream()
@@ -365,6 +465,142 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
assertThat(actualResults).containsExactlyInAnyOrderElementsOf(Iterables.concat(expectedData));
}
+ @TestTemplate
+ public void testTriggerAvailableNowCapsAsyncPreloadAfterPrepare() {
+ List<List<SimpleRecord>> initialData =
+ List.of(List.of(new SimpleRecord(1, "one")), List.of(new
SimpleRecord(2, "two")));
+ appendDataAsMultipleSnapshots(initialData);
+
+ table.refresh();
+ long expectedCapSnapshotId = table.currentSnapshot().snapshotId();
+
+ SparkMicroBatchStream stream =
+ new SparkMicroBatchStream(
+ JavaSparkContext.fromSparkContext(spark.sparkContext()),
+ table,
+ table::io,
+ new SparkReadConf(
+ spark,
+ table,
+ ImmutableMap.of(
+ SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
+ async.toString(),
+ SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
+ "1",
+ SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
+ "1",
+ SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT,
+ "10",
+ SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT,
+ "10")),
+ table.schema(),
+ temp.resolve("available-now-cap-checkpoint").toString());
+
+ try {
+ stream.prepareForTriggerAvailableNow();
+
+ appendData(List.of(new SimpleRecord(3, "three")));
+
+ Offset startOffset = stream.initialOffset();
+ Offset firstEndOffset = stream.latestOffset(startOffset,
stream.getDefaultReadLimit());
+ assertThat(firstEndOffset).isNotNull();
+ stream.planInputPartitions(startOffset, firstEndOffset);
+
+ Offset secondEndOffset = stream.latestOffset(firstEndOffset,
stream.getDefaultReadLimit());
+ assertThat(secondEndOffset).isNotNull();
+ stream.planInputPartitions(firstEndOffset, secondEndOffset);
+
+ assertThat(stream.latestOffset(secondEndOffset,
stream.getDefaultReadLimit())).isNull();
+ assertThat(((StreamingOffset)
secondEndOffset).snapshotId()).isEqualTo(expectedCapSnapshotId);
+ } finally {
+ stream.stop();
+ }
+ }
+
+ @TestTemplate
+ public void testLatestOffsetReturnsNullAfterFinalBatchIsConsumed() throws
Exception {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+ table.refresh();
+ int expectedBatchCount;
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ expectedBatchCount = Iterables.size(tasks);
+ }
+
+ SparkMicroBatchStream stream =
+ newMicroBatchStream(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"),
+ "drain-to-null-checkpoint");
+
+ try {
+ int plannedBatchCount = 0;
+ Offset startOffset = stream.initialOffset();
+ Offset endOffset = stream.latestOffset(startOffset,
stream.getDefaultReadLimit());
+ while (endOffset != null) {
+ InputPartition[] partitions = stream.planInputPartitions(startOffset,
endOffset);
+ assertThat(partitions).isNotEmpty();
+ plannedBatchCount += 1;
+ startOffset = endOffset;
+ endOffset = stream.latestOffset(startOffset,
stream.getDefaultReadLimit());
+ }
+
+ assertThat(endOffset).isNull();
+ assertThat(plannedBatchCount).isEqualTo(expectedBatchCount);
+ } finally {
+ stream.stop();
+ }
+ }
+
+ @TestTemplate
+ public void testPlanInputPartitionsIsIdempotentForSameOffsets() {
+ appendDataAsMultipleSnapshots(TEST_DATA_MULTIPLE_SNAPSHOTS);
+
+ SparkMicroBatchStream stream =
+ newMicroBatchStream(
+
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH, "1"),
+ "idempotent-plan-files-checkpoint");
+
+ try {
+ Offset startOffset = stream.initialOffset();
+ Offset endOffset = stream.latestOffset(startOffset,
stream.getDefaultReadLimit());
+
+ assertThat(endOffset).isNotNull();
+
+ InputPartition[] firstPartitions =
stream.planInputPartitions(startOffset, endOffset);
+ InputPartition[] secondPartitions =
stream.planInputPartitions(startOffset, endOffset);
+
+ List<String> firstFileLocations = Lists.newArrayList();
+ for (InputPartition partition : firstPartitions) {
+ SparkInputPartition sparkInputPartition = (SparkInputPartition)
partition;
+ for (FileScanTask task :
sparkInputPartition.<FileScanTask>taskGroup().tasks()) {
+ firstFileLocations.add(task.file().location());
+ }
+ }
+
+ List<String> secondFileLocations = Lists.newArrayList();
+ for (InputPartition partition : secondPartitions) {
+ SparkInputPartition sparkInputPartition = (SparkInputPartition)
partition;
+ for (FileScanTask task :
sparkInputPartition.<FileScanTask>taskGroup().tasks()) {
+ secondFileLocations.add(task.file().location());
+ }
+ }
+
+
assertThat(firstFileLocations).containsExactlyInAnyOrderElementsOf(secondFileLocations);
+
+ startOffset = endOffset;
+ endOffset = stream.latestOffset(startOffset,
stream.getDefaultReadLimit());
+ while (endOffset != null) {
+ assertThat(stream.planInputPartitions(startOffset,
endOffset)).isNotEmpty();
+ startOffset = endOffset;
+ endOffset = stream.latestOffset(startOffset,
stream.getDefaultReadLimit());
+ }
+
+ assertThat(endOffset).isNull();
+ } finally {
+ stream.stop();
+ }
+ }
+
@TestTemplate
public void testReadStreamOnIcebergThenAddData() throws Exception {
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
@@ -394,7 +630,7 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP,
Long.toString(streamStartTimestamp));
List<SimpleRecord> empty = rowsAvailable(query);
- assertThat(empty.isEmpty()).isTrue();
+ assertThat(empty).isEmpty();
List<List<SimpleRecord>> expected = TEST_DATA_MULTIPLE_SNAPSHOTS;
appendDataAsMultipleSnapshots(expected);
@@ -412,7 +648,7 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
startStream(SparkReadOptions.STREAM_FROM_TIMESTAMP,
Long.toString(futureTimestamp));
List<SimpleRecord> actual = rowsAvailable(query);
- assertThat(actual.isEmpty()).isTrue();
+ assertThat(actual).isEmpty();
List<SimpleRecord> data =
Lists.newArrayList(
@@ -425,13 +661,15 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
.forEach(
x -> {
appendData(data);
- assertThat(rowsAvailable(query).isEmpty()).isTrue();
+ assertThat(rowsAvailable(query)).isEmpty();
});
waitUntilAfter(futureTimestamp);
// Data appended after the timestamp should appear
appendData(data);
+ // Allow async background thread to refresh, else test sometimes fails
+ Thread.sleep(50);
actual = rowsAvailable(query);
assertThat(actual).containsExactlyInAnyOrderElementsOf(data);
}
@@ -674,7 +912,6 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
appendDataAsMultipleSnapshots(expected);
makeRewriteDataFiles();
-
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
List.of(1L, 2L, 1L, 1L, 1L, 1L));
@@ -688,10 +925,8 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
appendDataAsMultipleSnapshots(expected);
makeRewriteDataFiles();
-
assertMicroBatchRecordSizes(
- ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"4"),
- List.of(5L, 2L));
+ ImmutableMap.of(SparkReadOptions.STREAMING_MAX_ROWS_PER_MICRO_BATCH,
"4"), List.of(4L, 3L));
}
@TestTemplate
@@ -719,7 +954,6 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
makeRewriteDataFiles();
-
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
List.of(1L, 2L, 1L, 1L, 1L, 1L));
@@ -735,7 +969,6 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
makeRewriteDataFiles();
appendDataAsMultipleSnapshots(expected);
-
assertMicroBatchRecordSizes(
ImmutableMap.of(SparkReadOptions.STREAMING_MAX_FILES_PER_MICRO_BATCH,
"1"),
List.of(1L, 2L, 1L, 1L, 1L, 1L, 1L, 2L, 1L, 1L, 1L, 1L));
@@ -884,13 +1117,18 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
private static final String MEMORY_TABLE = "_stream_view_mem";
private StreamingQuery startStream(Map<String, String> options) throws
TimeoutException {
+ Map<String, String> allOptions = Maps.newHashMap(options);
+ allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
return spark
.readStream()
- .options(options)
+ .options(allOptions)
.format("iceberg")
.load(tableName)
.writeStream()
- .options(options)
+ .options(allOptions)
.format("memory")
.queryName(MEMORY_TABLE)
.outputMode(OutputMode.Append())
@@ -915,11 +1153,17 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
private void assertMicroBatchRecordSizes(
Map<String, String> options, List<Long> expectedMicroBatchRecordSize,
Trigger trigger)
throws TimeoutException {
- Dataset<Row> ds =
spark.readStream().options(options).format("iceberg").load(tableName);
- List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
+ Map<String, String> allOptions = Maps.newHashMap(options);
+ allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
+
+ Dataset<Row> ds =
spark.readStream().options(allOptions).format("iceberg").load(tableName);
+ List<Long> syncList = Collections.synchronizedList(Lists.newArrayList());
ds.writeStream()
- .options(options)
+ .options(allOptions)
.trigger(trigger)
.foreachBatch(
(VoidFunction2<Dataset<Row>, Long>)
@@ -941,4 +1185,21 @@ public final class TestStructuredStreamingRead3 extends
CatalogTestBase {
.as(Encoders.bean(SimpleRecord.class))
.collectAsList();
}
+
+ private SparkMicroBatchStream newMicroBatchStream(
+ Map<String, String> options, String checkpointDirName) {
+ Map<String, String> allOptions = Maps.newHashMap(options);
+ allOptions.put(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED,
async.toString());
+ if (async) {
+
allOptions.putIfAbsent(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS,
"1");
+ }
+
+ return new SparkMicroBatchStream(
+ JavaSparkContext.fromSparkContext(spark.sparkContext()),
+ table,
+ table::io,
+ new SparkReadConf(spark, table, allOptions),
+ table.schema(),
+ temp.resolve(checkpointDirName).toString());
+ }
}