This is an automated email from the ASF dual-hosted git repository.

amoghj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 0d4d3a562f Core: Fix Async Planning handling in RESTCatalogAdapter for 
Remote Scan Planning (#14629)
0d4d3a562f is described below

commit 0d4d3a562ffd339faae7e1db41c2068dc77920e8
Author: Prashant Singh <[email protected]>
AuthorDate: Thu Nov 20 10:47:12 2025 -0800

    Core: Fix Async Planning handling in RESTCatalogAdapter for Remote Scan 
Planning (#14629)
    
    Co-authored-by: Prashant Singh <[email protected]>
    Co-authored-by: Amogh Jahagirdar <[email protected]>
---
 .../org/apache/iceberg/rest/CatalogHandlers.java   | 30 ++++++++++--
 .../apache/iceberg/rest/InMemoryPlanningState.java | 57 +++++++++++++++++++++-
 2 files changed, 81 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java 
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index 9d1c6d6bbf..dca17bc520 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -661,7 +662,7 @@ public class CatalogHandlers {
     tableScan = tableScan.caseSensitive(request.caseSensitive());
 
     if (shouldPlanAsync.test(tableScan)) {
-      String asyncPlanId = UUID.randomUUID().toString();
+      String asyncPlanId = "async-" + UUID.randomUUID();
       asyncPlanFiles(tableScan, asyncPlanId, 
tasksPerPlanTask.applyAsInt(tableScan));
       return PlanTableScanResponse.builder()
           .withPlanId(asyncPlanId)
@@ -670,11 +671,12 @@ public class CatalogHandlers {
           .build();
     }
 
-    String planId = UUID.randomUUID().toString();
+    String planId = "sync-" + UUID.randomUUID();
     planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan));
     Pair<List<FileScanTask>, String> initial = 
IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
     return PlanTableScanResponse.builder()
         .withPlanStatus(PlanStatus.COMPLETED)
+        .withPlanId(planId)
         .withPlanTasks(IN_MEMORY_PLANNING_STATE.nextPlanTask(initial.second()))
         .withFileScanTasks(initial.first())
         .withDeleteFiles(
@@ -697,6 +699,11 @@ public class CatalogHandlers {
   public static FetchPlanningResultResponse fetchPlanningResult(
       Catalog catalog, TableIdentifier ident, String planId) {
     Table table = catalog.loadTable(ident);
+    PlanStatus status = IN_MEMORY_PLANNING_STATE.asyncPlanStatus(planId);
+    if (status != PlanStatus.COMPLETED) {
+      return 
FetchPlanningResultResponse.builder().withPlanStatus(status).build();
+    }
+
     Pair<List<FileScanTask>, String> initial = 
IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
     return FetchPlanningResultResponse.builder()
         .withPlanStatus(PlanStatus.COMPLETED)
@@ -743,12 +750,11 @@ public class CatalogHandlers {
    * @param planId the plan identifier to cancel
    */
   public static void cancelPlanTableScan(String planId) {
-    IN_MEMORY_PLANNING_STATE.removePlan(planId);
+    IN_MEMORY_PLANNING_STATE.cancelPlan(planId);
   }
 
   static void clearPlanningState() {
     InMemoryPlanningState.getInstance().clear();
-    ASYNC_PLANNING_POOL.shutdown();
   }
 
   /**
@@ -775,8 +781,22 @@ public class CatalogHandlers {
     }
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   private static void asyncPlanFiles(
       TableScan tableScan, String asyncPlanId, int tasksPerPlanTask) {
-    ASYNC_PLANNING_POOL.execute(() -> planFilesFor(tableScan, asyncPlanId, 
tasksPerPlanTask));
+    IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
+    CompletableFuture.runAsync(
+            () -> {
+              planFilesFor(tableScan, asyncPlanId, tasksPerPlanTask);
+            },
+            ASYNC_PLANNING_POOL)
+        .whenComplete(
+            (result, exception) -> {
+              if (exception != null) {
+                IN_MEMORY_PLANNING_STATE.markAsyncPlanFailed(asyncPlanId);
+              } else {
+                IN_MEMORY_PLANNING_STATE.markAsyncPlanAsComplete(asyncPlanId);
+              }
+            });
   }
 }
diff --git 
a/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java 
b/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java
index 2aed2f7cf5..b90740c4fa 100644
--- a/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java
+++ b/core/src/main/java/org/apache/iceberg/rest/InMemoryPlanningState.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.exceptions.NoSuchPlanIdException;
 import org.apache.iceberg.exceptions.NoSuchPlanTaskException;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.base.Splitter;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -43,10 +44,12 @@ class InMemoryPlanningState {
 
   private final Map<String, List<FileScanTask>> planTaskToFileScanTasks;
   private final Map<String, String> planTaskToNext;
+  private final Map<String, PlanStatus> asyncPlanningStates;
 
   private InMemoryPlanningState() {
     this.planTaskToFileScanTasks = Maps.newConcurrentMap();
     this.planTaskToNext = Maps.newConcurrentMap();
+    this.asyncPlanningStates = Maps.newConcurrentMap();
   }
 
   static InMemoryPlanningState getInstance() {
@@ -68,6 +71,44 @@ class InMemoryPlanningState {
     planTaskToNext.put(currentTask, nextTask);
   }
 
+  void addAsyncPlan(String plan) {
+    PlanStatus existingStatus = asyncPlanningStates.get(plan);
+    Preconditions.checkArgument(
+        existingStatus == null, "Plan %s already exists with status %s", plan, 
existingStatus);
+    asyncPlanningStates.put(plan, PlanStatus.SUBMITTED);
+  }
+
+  PlanStatus asyncPlanStatus(String plan) {
+    PlanStatus existingStatus = asyncPlanningStates.get(plan);
+    if (existingStatus == null) {
+      throw new NoSuchPlanIdException("Cannot find plan with id %s", plan);
+    }
+
+    return asyncPlanningStates.get(plan);
+  }
+
+  void markAsyncPlanAsComplete(String plan) {
+    PlanStatus existingStatus = asyncPlanningStates.get(plan);
+    Preconditions.checkArgument(existingStatus != null, "Cannot find plan %s", 
plan);
+    Preconditions.checkArgument(
+        existingStatus == PlanStatus.SUBMITTED,
+        "Cannot mark plan %s as completed as it is %s",
+        plan,
+        existingStatus);
+    asyncPlanningStates.put(plan, PlanStatus.COMPLETED);
+  }
+
+  void markAsyncPlanFailed(String plan) {
+    PlanStatus existingStatus = asyncPlanningStates.get(plan);
+    Preconditions.checkArgument(existingStatus != null, "Cannot find plan %s", 
plan);
+    Preconditions.checkArgument(
+        existingStatus == PlanStatus.SUBMITTED,
+        "Cannot mark plan %s as completed as it is %s",
+        plan,
+        existingStatus);
+    asyncPlanningStates.put(plan, PlanStatus.FAILED);
+  }
+
   List<FileScanTask> fileScanTasksForPlanTask(String planTaskKey) {
     List<FileScanTask> tasks = planTaskToFileScanTasks.get(planTaskKey);
     if (tasks == null) {
@@ -120,13 +161,27 @@ class InMemoryPlanningState {
     return Pair.of(initialEntry.getValue(), initialEntry.getKey());
   }
 
-  void removePlan(String planId) {
+  void cancelPlan(String planId) {
     planTaskToNext.entrySet().removeIf(entry -> 
entry.getKey().contains(planId));
     planTaskToFileScanTasks.entrySet().removeIf(entry -> 
entry.getKey().contains(planId));
+    // Clear the ongoing plan status in case the planID is an async one.
+    if (asyncPlanningStates.containsKey(planId)) {
+      PlanStatus existingStatus = asyncPlanningStates.get(planId);
+      // No need to fail cancellation if the plan could not be found
+      if (existingStatus == null) {
+        return;
+      }
+
+      // No need to fail cancellation if the plan already terminated
+      if (existingStatus == PlanStatus.SUBMITTED) {
+        asyncPlanningStates.put(planId, PlanStatus.CANCELLED);
+      }
+    }
   }
 
   void clear() {
     planTaskToFileScanTasks.clear();
     planTaskToNext.clear();
+    asyncPlanningStates.clear();
   }
 }

Reply via email to