This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 74a1160743 Spark 4.0, Core: Add Limit pushdown to Scan (#14615)
74a1160743 is described below

commit 74a11607435ec332e557323b78c51726a7ef2fdf
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Wed Dec 3 07:32:55 2025 +0100

    Spark 4.0, Core: Add Limit pushdown to Scan (#14615)
---
 .../java/org/apache/iceberg/BatchScanAdapter.java  |  5 ++
 api/src/main/java/org/apache/iceberg/Scan.java     | 13 +++
 .../src/main/java/org/apache/iceberg/BaseScan.java |  5 ++
 .../java/org/apache/iceberg/TableScanContext.java  |  7 ++
 .../iceberg/spark/source/SparkScanBuilder.java     | 15 +++-
 .../iceberg/spark/source/TestFilteredScan.java     | 94 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/sql/TestSelect.java   | 13 +++
 7 files changed, 151 insertions(+), 1 deletion(-)

diff --git a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java 
b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
index 02b3d241d8..d8c5dda885 100644
--- a/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
+++ b/api/src/main/java/org/apache/iceberg/BatchScanAdapter.java
@@ -151,4 +151,9 @@ class BatchScanAdapter implements BatchScan {
   public BatchScan metricsReporter(MetricsReporter reporter) {
     return new BatchScanAdapter(scan.metricsReporter(reporter));
   }
+
+  @Override
+  public BatchScan minRowsRequested(long numRows) {
+    return new BatchScanAdapter(scan.minRowsRequested(numRows));
+  }
 }
diff --git a/api/src/main/java/org/apache/iceberg/Scan.java 
b/api/src/main/java/org/apache/iceberg/Scan.java
index 339bc75336..9785ce6603 100644
--- a/api/src/main/java/org/apache/iceberg/Scan.java
+++ b/api/src/main/java/org/apache/iceberg/Scan.java
@@ -195,4 +195,17 @@ public interface Scan<ThisT, T extends ScanTask, G extends 
ScanTaskGroup<T>> {
     throw new UnsupportedOperationException(
         this.getClass().getName() + " doesn't implement metricsReporter");
   }
+
+  /**
+   * Create a new scan that returns files with at least the given number of 
rows. This is used as a
+   * hint and is entirely optional in order to not have to return more rows 
than necessary. This may
+   * return fewer rows if the scan does not contain that many, or it may 
return more than requested.
+   *
+   * @param numRows The minimum number of rows requested
+   * @return A new scan based on this with at least the given number of rows
+   */
+  default ThisT minRowsRequested(long numRows) {
+    throw new UnsupportedOperationException(
+        this.getClass().getName() + " doesn't implement minRowsRequested");
+  }
 }
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java 
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 618b2e95f2..e40b1b743c 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -293,6 +293,11 @@ abstract class BaseScan<ThisT, T extends ScanTask, G 
extends ScanTaskGroup<T>>
     return newRefinedScan(table, schema, context.reportWith(reporter));
   }
 
+  @Override
+  public ThisT minRowsRequested(long numRows) {
+    return newRefinedScan(table, schema, context.minRowsRequested(numRows));
+  }
+
   /**
    * Retrieves a list of column names based on the type of manifest content 
provided.
    *
diff --git a/core/src/main/java/org/apache/iceberg/TableScanContext.java 
b/core/src/main/java/org/apache/iceberg/TableScanContext.java
index 5722ed7d8c..faa1c264d5 100644
--- a/core/src/main/java/org/apache/iceberg/TableScanContext.java
+++ b/core/src/main/java/org/apache/iceberg/TableScanContext.java
@@ -103,6 +103,9 @@ abstract class TableScanContext {
   @Nullable
   public abstract String branch();
 
+  @Nullable
+  public abstract Long minRowsRequested();
+
   TableScanContext useSnapshotId(Long scanSnapshotId) {
     return 
ImmutableTableScanContext.builder().from(this).snapshotId(scanSnapshotId).build();
   }
@@ -193,6 +196,10 @@ abstract class TableScanContext {
     return ImmutableTableScanContext.builder().from(this).branch(ref).build();
   }
 
+  TableScanContext minRowsRequested(long numRows) {
+    return 
ImmutableTableScanContext.builder().from(this).minRowsRequested(numRows).build();
+  }
+
   public static TableScanContext empty() {
     return ImmutableTableScanContext.builder().build();
   }
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
index 968e6eeaaa..11b0ba58af 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
@@ -71,6 +71,7 @@ import org.apache.spark.sql.connector.read.Scan;
 import org.apache.spark.sql.connector.read.ScanBuilder;
 import org.apache.spark.sql.connector.read.Statistics;
 import org.apache.spark.sql.connector.read.SupportsPushDownAggregates;
+import org.apache.spark.sql.connector.read.SupportsPushDownLimit;
 import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
 import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters;
 import org.apache.spark.sql.connector.read.SupportsReportStatistics;
@@ -85,7 +86,8 @@ public class SparkScanBuilder
         SupportsPushDownAggregates,
         SupportsPushDownV2Filters,
         SupportsPushDownRequiredColumns,
-        SupportsReportStatistics {
+        SupportsReportStatistics,
+        SupportsPushDownLimit {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(SparkScanBuilder.class);
   private static final Predicate[] NO_PREDICATES = new Predicate[0];
@@ -102,6 +104,7 @@ public class SparkScanBuilder
   private boolean caseSensitive;
   private List<Expression> filterExpressions = null;
   private Predicate[] pushedPredicates = NO_PREDICATES;
+  private Integer limit = null;
 
   SparkScanBuilder(
       SparkSession spark,
@@ -739,6 +742,10 @@ public class SparkScanBuilder
               TableProperties.SPLIT_OPEN_FILE_COST, 
String.valueOf(splitOpenFileCost));
     }
 
+    if (null != limit) {
+      configuredScan = configuredScan.minRowsRequested(limit.longValue());
+    }
+
     return configuredScan;
   }
 
@@ -759,4 +766,10 @@ public class SparkScanBuilder
       return table.newBatchScan();
     }
   }
+
+  @Override
+  public boolean pushLimit(int pushedLimit) {
+    this.limit = pushedLimit;
+    return true;
+  }
 }
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index 8d7053efb1..61d6501a68 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -23,6 +23,7 @@ import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
 import static org.apache.iceberg.PlanningMode.LOCAL;
 import static org.apache.iceberg.data.FileHelpers.encrypt;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.io.File;
 import java.io.IOException;
@@ -73,6 +74,7 @@ import org.apache.spark.sql.sources.LessThan;
 import org.apache.spark.sql.sources.Not;
 import org.apache.spark.sql.sources.StringStartsWith;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.assertj.core.api.AbstractObjectAssert;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -267,6 +269,98 @@ public class TestFilteredScan {
             "ts < cast('2017-12-22 00:00:00+00:00' as timestamp)"));
   }
 
+  @TestTemplate
+  public void limitPushedDownToSparkScan() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path")), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    AbstractObjectAssert<?, ?> scanAssert = 
assertThat(builder.build()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+
+    // verify changelog scan
+    assertThat(builder.buildChangelogScan())
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify MoR scan
+    scanAssert = assertThat(builder.buildMergeOnReadScan()).extracting("scan");
+    if (LOCAL == planningMode) {
+      scanAssert = scanAssert.extracting("scan");
+    }
+
+    
scanAssert.extracting("context").extracting("minRowsRequested").isEqualTo(limit);
+  }
+
+  @TestTemplate
+  public void limitPushedDownToSparkScanForMetadataTable() {
+    assumeThat(fileFormat)
+        .as("no need to run this across the entire test matrix")
+        .isEqualTo(FileFormat.PARQUET);
+
+    CaseInsensitiveStringMap options =
+        new CaseInsensitiveStringMap(ImmutableMap.of("path", 
unpartitioned.toString()));
+
+    // load the snapshots metadata table
+    SparkScanBuilder builder =
+        new SparkScanBuilder(spark, TABLES.load(options.get("path") + 
"#snapshots"), options);
+
+    long limit = 23;
+    // simulate Spark pushing down the limit to the scan builder
+    builder.pushLimit((int) limit);
+    assertThat(builder).extracting("limit").isEqualTo((int) limit);
+
+    // verify batch scan
+    assertThat(builder.build())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify CoW scan
+    assertThat(builder.buildCopyOnWriteScan())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+
+    // verify MoR scan
+    assertThat(builder.buildMergeOnReadScan())
+        .extracting("scan")
+        .extracting("scan")
+        .extracting("context")
+        .extracting("minRowsRequested")
+        .isEqualTo(limit);
+  }
+
   @TestTemplate
   public void testBucketPartitionedIDFilters() {
     Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID);
diff --git 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
index 68b93be479..8fa8406278 100644
--- 
a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
+++ 
b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
@@ -150,6 +150,19 @@ public class TestSelect extends CatalogTestBase {
         .isEqualTo("(float IS NOT NULL AND is_nan(float))");
   }
 
+  @TestTemplate
+  public void selectWithLimit() {
+    Object[] first = row(1L, "a", 1.0F);
+    Object[] second = row(2L, "b", 2.0F);
+    Object[] third = row(3L, "c", Float.NaN);
+
+    // verify that LIMIT is properly applied in case 
SupportsPushDownLimit.isPartiallyPushed() is
+    // ever overridden in SparkScanBuilder
+    assertThat(sql("SELECT * FROM %s LIMIT 1", 
tableName)).containsExactly(first);
+    assertThat(sql("SELECT * FROM %s LIMIT 2", 
tableName)).containsExactly(first, second);
+    assertThat(sql("SELECT * FROM %s LIMIT 3", 
tableName)).containsExactly(first, second, third);
+  }
+
   @TestTemplate
   public void testProjection() {
     List<Object[]> expected = ImmutableList.of(row(1L), row(2L), row(3L));

Reply via email to