This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 74a1160743 Spark 4.0, Core: Add Limit pushdown to Scan (#14615)
74a1160743 is described below
commit 74a11607435ec332e557323b78c51726a7ef2fdf
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Dec 3 07:32:55 2025 +0100
Spark 4.0, Core: Add Limit pushdown to Scan (#14615)
---
.../java/org/apache/iceberg/BatchScanAdapter.java | 5 ++
api/src/main/java/org/apache/iceberg/Scan.java | 13 +++
.../src/main/java/org/apache/iceberg/BaseScan.java | 5 ++
.../java/org/apache/iceberg/TableScanContext.java | 7 ++
.../iceberg/spark/source/SparkScanBuilder.java | 15 +++-
.../iceberg/spark/source/TestFilteredScan.java | 94 ++++++++++++++++++++++
.../org/apache/iceberg/spark/sql/TestSelect.java | 13 +++
7 files changed, 151 insertions(+), 1 deletion(-)
diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
index 02b3d241d8..d8c5dda885 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -151,4 +151,9 @@ class BatchScanAdapter implements BatchScan {
public BatchScan metricsReporter(MetricsReporter reporter) {
return new BatchScanAdapter(scan.metricsReporter(reporter));
}
+
+ @Override
+ public BatchScan minRowsRequested(long numRows) {
+ return new BatchScanAdapter(scan.minRowsRequested(numRows));
+ }
}
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java
b/api/src/main/java/org/apache/iceberg/Scan.java
index 339bc75336..9785ce6603 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -195,4 +195,17 @@ public interface Scan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>> {
throw new UnsupportedOperationException(
this.getClass().getName() + " doesn't implement metricsReporter");
}
+
+ /**
+ * Create a new scan that returns files with at least the given number of
rows. This is used as a
+ * hint and is entirely optional in order to not have to return more rows
than necessary. This may
+ * return fewer rows if the scan does not contain that many, or it may
return more than requested.
+ *
+ * @param numRows The minimum number of rows requested
+ * @return A new scan based on this with at least the given number of rows
+ */
+ default ThisT minRowsRequested(long numRows) {
+ throw new UnsupportedOperationException(
+ this.getClass().getName() + " doesn't implement minRowsRequested");
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 618b2e95f2..e40b1b743c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -293,6 +293,11 @@ abstract class BaseScan<ThisT, T extends ScanTask, G
extends ScanTaskGroup<T>>
return newRefinedScan(table, schema, context.reportWith(reporter));
}
+ @Override
+ public ThisT minRowsRequested(long numRows) {
+ return newRefinedScan(table, schema, context.minRowsRequested(numRows));
+ }
+
/**
* Retrieves a list of column names based on the type of manifest content
provided.
*
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index 5722ed7d8c..faa1c264d5 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -103,6 +103,9 @@ abstract class TableScanContext {
@Nullable
public abstract String branch();
+ @Nullable
+ public abstract Long minRowsRequested();
+
TableScanContext useSnapshotId(Long scanSnapshotId) {
return
ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
}
@@ -193,6 +196,10 @@ abstract class TableScanContext {
return ImmutableTableScanContext.builder().from(this).branch(ref).build();
}
+ TableScanContext minRowsRequested(long numRows) {
+ return
ImmutableTableScanContext.builder().from(this).minRowsRequested(numRows).build();
+ }
+
public static TableScanContext empty() {
return ImmutableTableScanContext.builder().build();
}
diff --git
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 968e6eeaaa..11b0ba58af 100644
---
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -71,6 +71,7 @@ import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.read.Statistics;
import org.apache.spark.sql.connector.read.SupportsPushDownAggregates;
+import org.apache.spark.sql.connector.read.SupportsPushDownLimit;
import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
import org.apache.spark.sql.connector.read.SupportsReportStatistics;
@@ -85,7 +86,8 @@ public class SparkScanBuilder
SupportsPushDownAggregates,
SupportsPushDownV2Filters,
SupportsPushDownRequiredColumns,
- SupportsReportStatistics {
+ SupportsReportStatistics,
+ SupportsPushDownLimit {
private static final Logger LOG =
LoggerFactory.getLogger(SparkScanBuilder.class);
private static final Predicate[] NO_PREDICATES = new Predicate[0];
@@ -102,6 +104,7 @@ public class SparkScanBuilder
private boolean caseSensitive;
private List<Expression> filterExpressions = null;
private Predicate[] pushedPredicates = NO_PREDICATES;
+ private Integer limit = null;
SparkScanBuilder(
SparkSession spark,
@@ -739,6 +742,10 @@ public class SparkScanBuilder
TableProperties.SPLIT_OPEN_FILE_COST,
String.valueOf(splitOpenFileCost));
}
+ if (null != limit) {
+ configuredScan = configuredScan.minRowsRequested(limit.longValue());
+ }
+
return configuredScan;
}
@@ -759,4 +766,10 @@ public class SparkScanBuilder
return table.newBatchScan();
}
}
+
+ @Override
+ public boolean pushLimit(int pushedLimit) {
+ this.limit = pushedLimit;
+ return true;
+ }
}
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 8d7053efb1..61d6501a68 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -23,6 +23,7 @@ import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
import static org.apache.iceberg.data.FileHelpers.encrypt;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
@@ -73,6 +74,7 @@ import org.apache.spark.sql.sources.LessThan;
import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.StringStartsWith;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.assertj.core.api.AbstractObjectAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -267,6 +269,98 @@ public class TestFilteredScan {
"ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
}
+ @TestTemplate
+ public void limitPushedDownToSparkScan() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ AbstractObjectAssert<?, ?> scanAssert =
assertThat(builder.build()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+ // verify changelog scan
+ assertThat(builder.buildChangelogScan())
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify MoR scan
+ scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+ if (LOCAL == planningMode) {
+ scanAssert = scanAssert.extracting("scan");
+ }
+
+
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+ }
+
+ @TestTemplate
+ public void limitPushedDownToSparkScanForMetadataTable() {
+ assumeThat(fileFormat)
+ .as("no need to run this across the entire test matrix")
+ .isEqualTo(FileFormat.PARQUET);
+
+ CaseInsensitiveStringMap options =
+ new CaseInsensitiveStringMap(ImmutableMap.of("path",
unpartitioned.toString()));
+
+ // load the snapshots metadata table
+ SparkScanBuilder builder =
+ new SparkScanBuilder(spark, TABLES.load(options.get("path") +
"#snapshots"), options);
+
+ long limit = 23;
+ // simulate Spark pushing down the limit to the scan builder
+ builder.pushLimit((int) limit);
+ assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+ // verify batch scan
+ assertThat(builder.build())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify CoW scan
+ assertThat(builder.buildCopyOnWriteScan())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+
+ // verify MoR scan
+ assertThat(builder.buildMergeOnReadScan())
+ .extracting("scan")
+ .extracting("scan")
+ .extracting("context")
+ .extracting("minRowsRequested")
+ .isEqualTo(limit);
+ }
+
@TestTemplate
public void testBucketPartitionedIDFilters() {
Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID);
diff --git
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
index 68b93be479..8fa8406278 100644
---
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
+++
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
@@ -150,6 +150,19 @@ public class TestSelect extends CatalogTestBase {
.isEqualTo("(float IS NOT NULL AND is_nan(float))");
}
+ @TestTemplate
+ public void selectWithLimit() {
+ Object[] first = row(1L, "a", 1.0F);
+ Object[] second = row(2L, "b", 2.0F);
+ Object[] third = row(3L, "c", Float.NaN);
+
+ // verify that LIMIT is properly applied in case
SupportsPushDownLimit.isPartiallyPushed() is
+ // ever overridden in SparkScanBuilder
+ assertThat(sql("SELECT * FROM %s LIMIT 1",
tableName)).containsExactly(first);
+ assertThat(sql("SELECT * FROM %s LIMIT 2",
tableName)).containsExactly(first, second);
+ assertThat(sql("SELECT * FROM %s LIMIT 3",
tableName)).containsExactly(first, second, third);
+ }
+
@TestTemplate
public void testProjection() {
List<Object[]> expected = ImmutableList.of(row(1L), row(2L), row(3L));