This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c9f6c8423b Core: Sent minRowsRequested to REST server (#15661)
c9f6c8423b is described below
commit c9f6c8423be7cbf52c9cf678896deca478b3b6d1
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Mar 17 16:57:45 2026 +0100
Core: Sent minRowsRequested to REST server (#15661)
---
.../org/apache/iceberg/rest/CatalogHandlers.java | 38 ++++++++++++++++-----
.../org/apache/iceberg/rest/RESTTableScan.java | 3 +-
.../apache/iceberg/rest/TestRESTScanPlanning.java | 39 ++++++++++++++++++++++
3 files changed, 70 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index d75909b437..de74a2c895 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -854,7 +854,8 @@ public class CatalogHandlers {
configuredScan,
asyncPlanId,
table.uuid().toString(),
- tasksPerPlanTask.applyAsInt(configuredScan));
+ tasksPerPlanTask.applyAsInt(configuredScan),
+ request.minRowsRequested());
return PlanTableScanResponse.builder()
.withPlanId(asyncPlanId)
.withPlanStatus(PlanStatus.SUBMITTED)
@@ -868,7 +869,8 @@ public class CatalogHandlers {
configuredScan,
planId,
table.uuid().toString(),
- tasksPerPlanTask.applyAsInt(configuredScan));
+ tasksPerPlanTask.applyAsInt(configuredScan),
+ request.minRowsRequested());
List<String> nextPlanTasks =
initial.second() == null
? Collections.emptyList()
@@ -967,22 +969,33 @@ public class CatalogHandlers {
if (request.statsFields() != null) {
configuredScan =
configuredScan.includeColumnStats(request.statsFields());
}
+ if (request.minRowsRequested() != null) {
+ configuredScan =
configuredScan.minRowsRequested(request.minRowsRequested());
+ }
configuredScan = configuredScan.caseSensitive(request.caseSensitive());
return configuredScan;
}
/**
- * Plans file scan tasks for a table scan, grouping them into plan tasks for
pagination.
+ * Plans file scan tasks for a table scan, grouping them into plan tasks for
pagination. Note that
+ * minRowsRequested is used as a hint to the server to not have to return
more rows than
+ * necessary. It is not required for the server to return that many rows
since the scan may not
+ * produce that many rows. The server can also return more rows than
requested.
*
* @param scan the table scan to plan files for
* @param planId the unique identifier for this plan
* @param tableId the uuid of the table being scanned
* @param tasksPerPlanTask number of file scan tasks to group per plan task
+ * @param minRowsRequested number of rows requested for the scan
* @return the initial file scan tasks and the first plan task key
*/
private static Pair<List<FileScanTask>, String> planFilesFor(
- Scan<?, FileScanTask, ?> scan, String planId, String tableId, int
tasksPerPlanTask) {
+ Scan<?, FileScanTask, ?> scan,
+ String planId,
+ String tableId,
+ int tasksPerPlanTask,
+ Long minRowsRequested) {
try (CloseableIterable<FileScanTask> planTasks = scan.planFiles()) {
String planTaskPrefix = planId + "-" + tableId + "-";
@@ -994,7 +1007,12 @@ public class CatalogHandlers {
return Pair.of(Collections.emptyList(), planTaskKey);
}
- Iterable<List<FileScanTask>> taskGroupings =
Iterables.partition(planTasks, tasksPerPlanTask);
+ Iterable<FileScanTask> limitedTasks =
+ null != minRowsRequested
+ ? Iterables.limit(planTasks, (int) Math.min(minRowsRequested,
Integer.MAX_VALUE))
+ : planTasks;
+ Iterable<List<FileScanTask>> taskGroupings =
+ Iterables.partition(limitedTasks, tasksPerPlanTask);
int planTaskSequence = 0;
String previousPlanTask = null;
String firstPlanTaskKey = null;
@@ -1019,12 +1037,14 @@ public class CatalogHandlers {
@SuppressWarnings("FutureReturnValueIgnored")
private static void asyncPlanFiles(
- Scan<?, FileScanTask, ?> scan, String asyncPlanId, String tableId, int
tasksPerPlanTask) {
+ Scan<?, FileScanTask, ?> scan,
+ String asyncPlanId,
+ String tableId,
+ int tasksPerPlanTask,
+ Long minRowsRequested) {
IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
CompletableFuture.runAsync(
- () -> {
- planFilesFor(scan, asyncPlanId, tableId, tasksPerPlanTask);
- },
+ () -> planFilesFor(scan, asyncPlanId, tableId, tasksPerPlanTask,
minRowsRequested),
ASYNC_PLANNING_POOL)
.whenComplete(
(result, exception) -> {
diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index a78628ed87..5eaa4c2f27 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -156,7 +156,8 @@ class RESTTableScan extends DataTableScan {
.withSelect(selectedColumns)
.withFilter(filter())
.withCaseSensitive(isCaseSensitive())
- .withStatsFields(statsFields);
+ .withStatsFields(statsFields)
+ .withMinRowsRequested(context().minRowsRequested());
if (startSnapshotId != null && endSnapshotId != null) {
builder
diff --git
a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
index 734eaf485c..6ee257026b 100644
--- a/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
+++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTScanPlanning.java
@@ -30,6 +30,7 @@ import static org.apache.iceberg.TestBase.SCHEMA;
import static org.apache.iceberg.TestBase.SPEC;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.ArgumentMatchers.any;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectReader;
@@ -64,6 +65,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.credentials.Credential;
import org.apache.iceberg.rest.credentials.ImmutableCredential;
+import org.apache.iceberg.rest.requests.PlanTableScanRequest;
import org.apache.iceberg.rest.responses.ConfigResponse;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.iceberg.rest.responses.FetchPlanningResultResponse;
@@ -72,6 +74,7 @@ import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
public class TestRESTScanPlanning extends TestBaseWithRESTServer {
@@ -626,6 +629,42 @@ public class TestRESTScanPlanning extends
TestBaseWithRESTServer {
}
}
+ @ParameterizedTest
+ @EnumSource(PlanningMode.class)
+ void scanPlanningWithMinRowsRequested(
+ Function<TestPlanningBehavior.Builder, TestPlanningBehavior.Builder>
planMode) {
+ configurePlanningBehavior(planMode);
+ Table table = restTableFor(restCatalog, "min_rows_requested_table");
+ table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
+ setParserContext(table);
+
+ ArgumentCaptor<HTTPRequest> requestCaptor =
ArgumentCaptor.forClass(HTTPRequest.class);
+
+ assertThat(table.newScan().minRowsRequested(1L).planFiles()).hasSize(1);
+ Mockito.verify(adapterForRESTServer, Mockito.atLeastOnce())
+ .execute(requestCaptor.capture(), any(), any(), any(), any());
+ assertThat(
+ requestCaptor.getAllValues().stream()
+ .filter(req -> req.body() instanceof PlanTableScanRequest)
+ .map(req -> (PlanTableScanRequest) req.body())
+ .reduce((first, second) -> second)
+ .get()
+ .minRowsRequested())
+ .isEqualTo(1L);
+
+ assertThat(table.newScan().minRowsRequested(100L).planFiles()).hasSize(3);
+ Mockito.verify(adapterForRESTServer, Mockito.atLeastOnce())
+ .execute(requestCaptor.capture(), any(), any(), any(), any());
+ assertThat(
+ requestCaptor.getAllValues().stream()
+ .filter(req -> req.body() instanceof PlanTableScanRequest)
+ .map(req -> (PlanTableScanRequest) req.body())
+ .reduce((first, second) -> second)
+ .get()
+ .minRowsRequested())
+ .isEqualTo(100L);
+ }
+
@ParameterizedTest
@EnumSource(PlanningMode.class)
void remoteScanPlanningDeletesCancellation(