This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new c9f6c8423b Core: Sent minRowsRequested to REST server (#15661)
c9f6c8423b is described below

commit c9f6c8423be7cbf52c9cf678896deca478b3b6d1
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Mar 17 16:57:45 2026 +0100

    Core: Sent minRowsRequested to REST server (#15661)
---
 .../org/apache/iceberg/rest/CatalogHandlers.java   | 38 ++++++++++++++++-----
 .../org/apache/iceberg/rest/RESTTableScan.java     |  3 +-
 .../apache/iceberg/rest/TestRESTScanPlanning.java  | 39 ++++++++++++++++++++++
 3 files changed, 70 insertions(+), 10 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java 
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index d75909b437..de74a2c895 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -854,7 +854,8 @@ public class CatalogHandlers {
           configuredScan,
           asyncPlanId,
           table.uuid().toString(),
-          tasksPerPlanTask.applyAsInt(configuredScan));
+          tasksPerPlanTask.applyAsInt(configuredScan),
+          request.minRowsRequested());
       return PlanTableScanResponse.builder()
           .withPlanId(asyncPlanId)
           .withPlanStatus(PlanStatus.SUBMITTED)
@@ -868,7 +869,8 @@ public class CatalogHandlers {
             configuredScan,
             planId,
             table.uuid().toString(),
-            tasksPerPlanTask.applyAsInt(configuredScan));
+            tasksPerPlanTask.applyAsInt(configuredScan),
+            request.minRowsRequested());
     List<String> nextPlanTasks =
         initial.second() == null
             ? Collections.emptyList()
@@ -967,22 +969,33 @@ public class CatalogHandlers {
     if (request.statsFields() != null) {
       configuredScan = 
configuredScan.includeColumnStats(request.statsFields());
     }
+    if (request.minRowsRequested() != null) {
+      configuredScan = 
configuredScan.minRowsRequested(request.minRowsRequested());
+    }
     configuredScan = configuredScan.caseSensitive(request.caseSensitive());
 
     return configuredScan;
   }
 
   /**
-   * Plans file scan tasks for a table scan, grouping them into plan tasks for 
pagination.
+   * Plans file scan tasks for a table scan, grouping them into plan tasks for 
pagination. Note that
+   * minRowsRequested is used as a hint to the server to not have to return 
more rows than
+   * necessary. It is not required for the server to return that many rows 
since the scan may not
+   * produce that many rows. The server can also return more rows than 
requested.
    *
    * @param scan the table scan to plan files for
    * @param planId the unique identifier for this plan
    * @param tableId the uuid of the table being scanned
    * @param tasksPerPlanTask number of file scan tasks to group per plan task
+   * @param minRowsRequested number of rows requested for the scan
    * @return the initial file scan tasks and the first plan task key
    */
   private static Pair<List<FileScanTask>, String> planFilesFor(
-      Scan<?, FileScanTask, ?> scan, String planId, String tableId, int 
tasksPerPlanTask) {
+      Scan<?, FileScanTask, ?> scan,
+      String planId,
+      String tableId,
+      int tasksPerPlanTask,
+      Long minRowsRequested) {
     try (CloseableIterable<FileScanTask> planTasks = scan.planFiles()) {
       String planTaskPrefix = planId + "-" + tableId + "-";
 
@@ -994,7 +1007,12 @@ public class CatalogHandlers {
         return Pair.of(Collections.emptyList(), planTaskKey);
       }
 
-      Iterable<List<FileScanTask>> taskGroupings = 
Iterables.partition(planTasks, tasksPerPlanTask);
+      Iterable<FileScanTask> limitedTasks =
+          null != minRowsRequested
+              ? Iterables.limit(planTasks, (int) Math.min(minRowsRequested, 
Integer.MAX_VALUE))
+              : planTasks;
+      Iterable<List<FileScanTask>> taskGroupings =
+          Iterables.partition(limitedTasks, tasksPerPlanTask);
       int planTaskSequence = 0;
       String previousPlanTask = null;
       String firstPlanTaskKey = null;
@@ -1019,12 +1037,14 @@ public class CatalogHandlers {
 
   @SuppressWarnings("FutureReturnValueIgnored")
   private static void asyncPlanFiles(
-      Scan<?, FileScanTask, ?> scan, String asyncPlanId, String tableId, int 
tasksPerPlanTask) {
+      Scan<?, FileScanTask, ?> scan,
+      String asyncPlanId,
+      String tableId,
+      int tasksPerPlanTask,
+      Long minRowsRequested) {
     IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
     CompletableFuture.runAsync(
-            () -> {
-              planFilesFor(scan, asyncPlanId, tableId, tasksPerPlanTask);
-            },
+            () -> planFilesFor(scan, asyncPlanId, tableId, tasksPerPlanTask, 
minRowsRequested),
             ASYNC_PLANNING_POOL)
         .whenComplete(
             (result, exception) -> {
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index a78628ed87..5eaa4c2f27 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -156,7 +156,8 @@ class RESTTableScan extends DataTableScan {
             .withSelect(selectedColumns)
             .withFilter(filter())
             .withCaseSensitive(isCaseSensitive())
-            .withStatsFields(statsFields);
+            .withStatsFields(statsFields)
+            .withMinRowsRequested(context().minRowsRequested());
 
     if (startSnapshotId != null && endSnapshotId != null) {
       builder
diff --git 
a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java 
b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
index 734eaf485c..6ee257026b 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
@@ -30,6 +30,7 @@ import static org.apache.iceberg.TestBase.SCHEMA;
 import static org.apache.iceberg.TestBase.SPEC;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectReader;
@@ -64,6 +65,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.rest.credentials.Credential;
 import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
 import org.apache.iceberg.rest.responses.ConfigResponse;
 import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
@@ -72,6 +74,7 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 public class TestRESTScanPlanning extends TestBaseWithRESTServer {
@@ -626,6 +629,42 @@ public class TestRESTScanPlanning extends 
TestBaseWithRESTServer {
     }
   }
 
+  @ParameterizedTest
+  @EnumSource(PlanningMode.class)
+  void scanPlanningWithMinRowsRequested(
+      Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder> 
planMode) {
+    configurePlanningBehavior(planMode);
+    Table table = restTableFor(restCatalog, "min_rows_requested_table");
+    table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+    setParserContext(table);
+
+    ArgumentCaptor<HTTPRequest> requestCaptor = 
ArgumentCaptor.forClass(HTTPRequest.class);
+
+    assertThat(table.newScan().minRowsRequested(1L).planFiles()).hasSize(1);
+    Mockito.verify(adapterForRESTServer, Mockito.atLeastOnce())
+        .execute(requestCaptor.capture(), any(), any(), any(), any());
+    assertThat(
+            requestCaptor.getAllValues().stream()
+                .filter(req -> req.body() instanceof PlanTableScanRequest)
+                .map(req -> (PlanTableScanRequest) req.body())
+                .reduce((first, second) -> second)
+                .get()
+                .minRowsRequested())
+        .isEqualTo(1L);
+
+    assertThat(table.newScan().minRowsRequested(100L).planFiles()).hasSize(3);
+    Mockito.verify(adapterForRESTServer, Mockito.atLeastOnce())
+        .execute(requestCaptor.capture(), any(), any(), any(), any());
+    assertThat(
+            requestCaptor.getAllValues().stream()
+                .filter(req -> req.body() instanceof PlanTableScanRequest)
+                .map(req -> (PlanTableScanRequest) req.body())
+                .reduce((first, second) -> second)
+                .get()
+                .minRowsRequested())
+        .isEqualTo(100L);
+  }
+
   @ParameterizedTest
   @EnumSource(PlanningMode.class)
   void remoteScanPlanningDeletesCancellation(

Reply via email to