This is an automated email from the ASF dual-hosted git repository.
amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 0d4d3a562f Core: Fix Async Planning handling in RESTCatalogAdapter for
Remote Scan Planning (#14629)
0d4d3a562f is described below
commit 0d4d3a562ffd339faae7e1db41c2068dc77920e8
Author: Prashant Singh <[email protected]>
AuthorDate: Thu Nov 20 10:47:12 2025 -0800
Core: Fix Async Planning handling in RESTCatalogAdapter for Remote Scan
Planning (#14629)
Co-authored-by: Prashant Singh <[email protected]>
Co-authored-by: Amogh Jahagirdar <[email protected]>
---
.../org/apache/iceberg/rest/CatalogHandlers.java | 30 ++++++++++--
.../apache/iceberg/rest/InMemoryPlanningState.java | 57 +++++++++++++++++++++-
2 files changed, 81 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index 9d1c6d6bbf..dca17bc520 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -661,7 +662,7 @@ public class CatalogHandlers {
tableScan = tableScan.caseSensitive(request.caseSensitive());
if (shouldPlanAsync.test(tableScan)) {
- String asyncPlanId = UUID.randomUUID().toString();
+ String asyncPlanId = "async-" + UUID.randomUUID();
asyncPlanFiles(tableScan, asyncPlanId,
tasksPerPlanTask.applyAsInt(tableScan));
return PlanTableScanResponse.builder()
.withPlanId(asyncPlanId)
@@ -670,11 +671,12 @@ public class CatalogHandlers {
.build();
}
- String planId = UUID.randomUUID().toString();
+ String planId = "sync-" + UUID.randomUUID();
planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan));
Pair<List<FileScanTask>, String> initial =
IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
return PlanTableScanResponse.builder()
.withPlanStatus(PlanStatus.COMPLETED)
+ .withPlanId(planId)
.withPlanTasks(IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second()))
.withFileScanTasks(initial.first())
.withDeleteFiles(
@@ -697,6 +699,11 @@ public class CatalogHandlers {
public static FetchPlanningResultResponse fetchPlanningResult(
Catalog catalog, TableIdentifier ident, String planId) {
Table table = catalog.loadTable(ident);
+ PlanStatus status = IN_MEMORY_PLANNING_STATE.asyncPlanStatus(planId);
+ if (status != PlanStatus.COMPLETED) {
+ return
FetchPlanningResultResponse.builder().withPlanStatus(status).build();
+ }
+
Pair<List<FileScanTask>, String> initial =
IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
return FetchPlanningResultResponse.builder()
.withPlanStatus(PlanStatus.COMPLETED)
@@ -743,12 +750,11 @@ public class CatalogHandlers {
* @param planId the plan identifier to cancel
*/
public static void cancelPlanTableScan(String planId) {
- IN_MEMORY_PLANNING_STATE.removePlan(planId);
+ IN_MEMORY_PLANNING_STATE.cancelPlan(planId);
}
static void clearPlanningState() {
InMemoryPlanningState.getInstance().clear();
- ASYNC_PLANNING_POOL.shutdown();
}
/**
@@ -775,8 +781,22 @@ public class CatalogHandlers {
}
}
+ @SuppressWarnings("FutureReturnValueIgnored")
private static void asyncPlanFiles(
TableScan tableScan, String asyncPlanId, int tasksPerPlanTask) {
- ASYNC_PLANNING_POOL.execute(() -> planFilesFor(tableScan, asyncPlanId,
tasksPerPlanTask));
+ IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
+ CompletableFuture.runAsync(
+ () -> {
+ planFilesFor(tableScan, asyncPlanId, tasksPerPlanTask);
+ },
+ ASYNC_PLANNING_POOL)
+ .whenComplete(
+ (result, exception) -> {
+ if (exception != null) {
+ IN_MEMORY_PLANNING_STATE.markAsyncPlanFailed(asyncPlanId);
+ } else {
+ IN_MEMORY_PLANNING_STATE.markAsyncPlanAsComplete(asyncPlanId);
+ }
+ });
}
}
diff --git
a/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java
b/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java
index 2aed2f7cf5..b90740c4fa 100644
--- a/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java
+++ b/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.exceptions.NoSuchPlanIdException;
import org.apache.iceberg.exceptions.NoSuchPlanTaskException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -43,10 +44,12 @@ class InMemoryPlanningState {
private final Map<String, List<FileScanTask>> planTaskToFileScanTasks;
private final Map<String, String> planTaskToNext;
+ private final Map<String, PlanStatus> asyncPlanningStates;
private InMemoryPlanningState() {
this.planTaskToFileScanTasks = Maps.newConcurrentMap();
this.planTaskToNext = Maps.newConcurrentMap();
+ this.asyncPlanningStates = Maps.newConcurrentMap();
}
static InMemoryPlanningState getInstance() {
@@ -68,6 +71,44 @@ class InMemoryPlanningState {
planTaskToNext.put(currentTask, nextTask);
}
+ void addAsyncPlan(String plan) {
+ PlanStatus existingStatus = asyncPlanningStates.get(plan);
+ Preconditions.checkArgument(
+ existingStatus == null, "Plan %s already exists with status %s", plan,
existingStatus);
+ asyncPlanningStates.put(plan, PlanStatus.SUBMITTED);
+ }
+
+ PlanStatus asyncPlanStatus(String plan) {
+ PlanStatus existingStatus = asyncPlanningStates.get(plan);
+ if (existingStatus == null) {
+ throw new NoSuchPlanIdException("Cannot find plan with id %s", plan);
+ }
+
+ return asyncPlanningStates.get(plan);
+ }
+
+ void markAsyncPlanAsComplete(String plan) {
+ PlanStatus existingStatus = asyncPlanningStates.get(plan);
+ Preconditions.checkArgument(existingStatus != null, "Cannot find plan %s",
plan);
+ Preconditions.checkArgument(
+ existingStatus == PlanStatus.SUBMITTED,
+ "Cannot mark plan %s as completed as it is %s",
+ plan,
+ existingStatus);
+ asyncPlanningStates.put(plan, PlanStatus.COMPLETED);
+ }
+
+ void markAsyncPlanFailed(String plan) {
+ PlanStatus existingStatus = asyncPlanningStates.get(plan);
+ Preconditions.checkArgument(existingStatus != null, "Cannot find plan %s",
plan);
+ Preconditions.checkArgument(
+ existingStatus == PlanStatus.SUBMITTED,
+ "Cannot mark plan %s as completed as it is %s",
+ plan,
+ existingStatus);
+ asyncPlanningStates.put(plan, PlanStatus.FAILED);
+ }
+
List<FileScanTask> fileScanTasksForPlanTask(String planTaskKey) {
List<FileScanTask> tasks = planTaskToFileScanTasks.get(planTaskKey);
if (tasks == null) {
@@ -120,13 +161,27 @@ class InMemoryPlanningState {
return Pair.of(initialEntry.getValue(), initialEntry.getKey());
}
- void removePlan(String planId) {
+ void cancelPlan(String planId) {
planTaskToNext.entrySet().removeIf(entry ->
entry.getKey().contains(planId));
planTaskToFileScanTasks.entrySet().removeIf(entry ->
entry.getKey().contains(planId));
+ // Clear the ongoing plan status in case the planID is an async one.
+ if (asyncPlanningStates.containsKey(planId)) {
+ PlanStatus existingStatus = asyncPlanningStates.get(planId);
+ // No need to fail cancellation if the plan could not be found
+ if (existingStatus == null) {
+ return;
+ }
+
+ // No need to fail cancellation if the plan already terminated
+ if (existingStatus == PlanStatus.SUBMITTED) {
+ asyncPlanningStates.put(planId, PlanStatus.CANCELLED);
+ }
+ }
}
void clear() {
planTaskToFileScanTasks.clear();
planTaskToNext.clear();
+ asyncPlanningStates.clear();
}
}