This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new cd8d2a3345 Core: Fix server side planning on empty tables in
CatalogHandlers (#14660)
cd8d2a3345 is described below
commit cd8d2a3345cb387f1d735763ff3914ac4e2617e2
Author: Drew Gallardo <[email protected]>
AuthorDate: Wed Nov 26 09:01:29 2025 -0800
Core: Fix server side planning on empty tables in CatalogHandlers (#14660)
---
.../org/apache/iceberg/rest/CatalogHandlers.java | 70 +++++++++++++++-------
1 file changed, 48 insertions(+), 22 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index b08455408d..82695eaf78 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -685,24 +685,33 @@ public class CatalogHandlers {
}
String planId = "sync-" + UUID.randomUUID();
- planFilesFor(
- configuredScan,
- planId,
- table.uuid().toString(),
- tasksPerPlanTask.applyAsInt(configuredScan));
- Pair<List<FileScanTask>, String> initial =
IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
- return PlanTableScanResponse.builder()
- .withPlanStatus(PlanStatus.COMPLETED)
- .withPlanId(planId)
- .withPlanTasks(IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second()))
- .withFileScanTasks(initial.first())
- .withDeleteFiles(
- initial.first().stream()
- .flatMap(task -> task.deletes().stream())
- .distinct()
- .collect(Collectors.toList()))
- .withSpecsById(table.specs())
- .build();
+ Pair<List<FileScanTask>, String> initial =
+ planFilesFor(
+ configuredScan,
+ planId,
+ table.uuid().toString(),
+ tasksPerPlanTask.applyAsInt(configuredScan));
+ List<String> nextPlanTasks =
+ initial.second() == null
+ ? Collections.emptyList()
+ : IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second());
+ PlanTableScanResponse.Builder builder =
+ PlanTableScanResponse.builder()
+ .withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanId(planId)
+ .withFileScanTasks(initial.first())
+ .withDeleteFiles(
+ initial.first().stream()
+ .flatMap(task -> task.deletes().stream())
+ .distinct()
+ .collect(Collectors.toList()))
+ .withSpecsById(table.specs());
+
+ if (!nextPlanTasks.isEmpty()) {
+ builder.withPlanTasks(nextPlanTasks);
+ }
+
+ return builder.build();
}
/**
@@ -807,22 +816,39 @@ public class CatalogHandlers {
* @param planId the unique identifier for this plan
* @param tableId the uuid of the table being scanned
* @param tasksPerPlanTask number of file scan tasks to group per plan task
+ * @return the initial file scan tasks and the first plan task key
*/
- private static void planFilesFor(
+ private static Pair<List<FileScanTask>, String> planFilesFor(
Scan<?, FileScanTask, ?> scan, String planId, String tableId, int
tasksPerPlanTask) {
- Iterable<List<FileScanTask>> taskGroupings =
- Iterables.partition(scan.planFiles(), tasksPerPlanTask);
+ Iterable<FileScanTask> planTasks = scan.planFiles();
+ String planTaskPrefix = planId + "-" + tableId + "-";
+
+ // Handle empty table scans
+ if (!planTasks.iterator().hasNext()) {
+ String planTaskKey = planTaskPrefix + "0";
+ // Add empty scan to planning state so async calls know the scan
completed
+ IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey,
Collections.emptyList());
+ return Pair.of(Collections.emptyList(), planTaskKey);
+ }
+
+ Iterable<List<FileScanTask>> taskGroupings =
Iterables.partition(planTasks, tasksPerPlanTask);
int planTaskSequence = 0;
String previousPlanTask = null;
+ String firstPlanTaskKey = null;
+ List<FileScanTask> initialFileScanTasks = null;
for (List<FileScanTask> taskGrouping : taskGroupings) {
- String planTaskKey = String.format("%s-%s-%s", planId, tableId,
planTaskSequence++);
+ String planTaskKey = planTaskPrefix + planTaskSequence++;
IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
if (previousPlanTask != null) {
IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask,
planTaskKey);
+ } else {
+ firstPlanTaskKey = planTaskKey;
+ initialFileScanTasks = taskGrouping;
}
previousPlanTask = planTaskKey;
}
+ return Pair.of(initialFileScanTasks, firstPlanTaskKey);
}
@SuppressWarnings("FutureReturnValueIgnored")