This is an automated email from the ASF dual-hosted git repository.

singhpk234 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new f154dafb22 Core: Support incremental Scan in RESTCatalogAdapter for 
RemoteScanPlanning (#14661)
f154dafb22 is described below

commit f154dafb22676d69667f3b2380b2e9473601d4ae
Author: Prashant Singh <[email protected]>
AuthorDate: Mon Nov 24 15:59:04 2025 -0800

    Core: Support incremental Scan in RESTCatalogAdapter for RemoteScanPlanning 
(#14661)
---
 .../org/apache/iceberg/rest/CatalogHandlers.java   | 96 ++++++++++++++++------
 .../apache/iceberg/rest/RESTCatalogAdapter.java    |  5 +-
 2 files changed, 73 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java 
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index dca17bc520..b08455408d 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -42,8 +42,10 @@ import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.BaseTransaction;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.IncrementalAppendScan;
 import org.apache.iceberg.MetadataUpdate.UpgradeFormatVersion;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Scan;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
@@ -641,29 +643,40 @@ public class CatalogHandlers {
       Catalog catalog,
       TableIdentifier ident,
       PlanTableScanRequest request,
-      Predicate<TableScan> shouldPlanAsync,
-      ToIntFunction<TableScan> tasksPerPlanTask) {
+      Predicate<Scan<?, FileScanTask, ?>> shouldPlanAsync,
+      ToIntFunction<Scan<?, FileScanTask, ?>> tasksPerPlanTask) {
     Table table = catalog.loadTable(ident);
-    TableScan tableScan = table.newScan();
+    // Configure the appropriate scan type
+    Scan<?, FileScanTask, ?> configuredScan;
+
+    if (request.startSnapshotId() != null && request.endSnapshotId() != null) {
+      // Incremental append scan for reading changes between snapshots
+      IncrementalAppendScan incrementalScan =
+          table
+              .newIncrementalAppendScan()
+              .fromSnapshotInclusive(request.startSnapshotId())
+              .toSnapshot(request.endSnapshotId());
+
+      configuredScan = configureScan(incrementalScan, request);
+    } else {
+      // Regular table scan at a specific snapshot
+      TableScan tableScan = table.newScan();
 
-    if (request.snapshotId() != null) {
-      tableScan = tableScan.useSnapshot(request.snapshotId());
-    }
-    if (request.select() != null) {
-      tableScan = tableScan.select(request.select());
-    }
-    if (request.filter() != null) {
-      tableScan = tableScan.filter(request.filter());
-    }
-    if (request.statsFields() != null) {
-      tableScan = tableScan.includeColumnStats(request.statsFields());
-    }
+      if (request.snapshotId() != null) {
+        tableScan = tableScan.useSnapshot(request.snapshotId());
+      }
 
-    tableScan = tableScan.caseSensitive(request.caseSensitive());
+      // Apply filters and projections using common method
+      configuredScan = configureScan(tableScan, request);
+    }
 
-    if (shouldPlanAsync.test(tableScan)) {
+    if (shouldPlanAsync.test(configuredScan)) {
       String asyncPlanId = "async-" + UUID.randomUUID();
-      asyncPlanFiles(tableScan, asyncPlanId, 
tasksPerPlanTask.applyAsInt(tableScan));
+      asyncPlanFiles(
+          configuredScan,
+          asyncPlanId,
+          table.uuid().toString(),
+          tasksPerPlanTask.applyAsInt(configuredScan));
       return PlanTableScanResponse.builder()
           .withPlanId(asyncPlanId)
           .withPlanStatus(PlanStatus.SUBMITTED)
@@ -672,7 +685,11 @@ public class CatalogHandlers {
     }
 
     String planId = "sync-" + UUID.randomUUID();
-    planFilesFor(tableScan, planId, tasksPerPlanTask.applyAsInt(tableScan));
+    planFilesFor(
+        configuredScan,
+        planId,
+        table.uuid().toString(),
+        tasksPerPlanTask.applyAsInt(configuredScan));
     Pair<List<FileScanTask>, String> initial = 
IN_MEMORY_PLANNING_STATE.initialScanTasksFor(planId);
     return PlanTableScanResponse.builder()
         .withPlanStatus(PlanStatus.COMPLETED)
@@ -757,21 +774,48 @@ public class CatalogHandlers {
     InMemoryPlanningState.getInstance().clear();
   }
 
+  /**
+   * Applies filters, projections, and other scan configurations from the 
request to the scan.
+   *
+   * @param scan the scan to configure
+   * @param request the plan table scan request containing filters and 
projections
+   * @param <T> the specific scan type (TableScan, IncrementalAppendScan, etc.)
+   * @return the configured scan with filters and projections applied
+   */
+  private static <T extends Scan<T, FileScanTask, ?>> T configureScan(
+      T scan, PlanTableScanRequest request) {
+    T configuredScan = scan;
+
+    if (request.select() != null) {
+      configuredScan = configuredScan.select(request.select());
+    }
+    if (request.filter() != null) {
+      configuredScan = configuredScan.filter(request.filter());
+    }
+    if (request.statsFields() != null) {
+      configuredScan = 
configuredScan.includeColumnStats(request.statsFields());
+    }
+    configuredScan = configuredScan.caseSensitive(request.caseSensitive());
+
+    return configuredScan;
+  }
+
   /**
    * Plans file scan tasks for a table scan, grouping them into plan tasks for 
pagination.
    *
-   * @param tableScan the table scan to plan
+   * @param scan the table scan to plan files for
    * @param planId the unique identifier for this plan
+   * @param tableId the uuid of the table being scanned
    * @param tasksPerPlanTask number of file scan tasks to group per plan task
    */
-  private static void planFilesFor(TableScan tableScan, String planId, int 
tasksPerPlanTask) {
+  private static void planFilesFor(
+      Scan<?, FileScanTask, ?> scan, String planId, String tableId, int 
tasksPerPlanTask) {
     Iterable<List<FileScanTask>> taskGroupings =
-        Iterables.partition(tableScan.planFiles(), tasksPerPlanTask);
+        Iterables.partition(scan.planFiles(), tasksPerPlanTask);
     int planTaskSequence = 0;
     String previousPlanTask = null;
     for (List<FileScanTask> taskGrouping : taskGroupings) {
-      String planTaskKey =
-          String.format("%s-%s-%s", planId, tableScan.table().uuid(), 
planTaskSequence++);
+      String planTaskKey = String.format("%s-%s-%s", planId, tableId, 
planTaskSequence++);
       IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
       if (previousPlanTask != null) {
         IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, 
planTaskKey);
@@ -783,11 +827,11 @@ public class CatalogHandlers {
 
   @SuppressWarnings("FutureReturnValueIgnored")
   private static void asyncPlanFiles(
-      TableScan tableScan, String asyncPlanId, int tasksPerPlanTask) {
+      Scan<?, FileScanTask, ?> scan, String asyncPlanId, String tableId, int 
tasksPerPlanTask) {
     IN_MEMORY_PLANNING_STATE.addAsyncPlan(asyncPlanId);
     CompletableFuture.runAsync(
             () -> {
-              planFilesFor(tableScan, asyncPlanId, tasksPerPlanTask);
+              planFilesFor(scan, asyncPlanId, tableId, tasksPerPlanTask);
             },
             ASYNC_PLANNING_POOL)
         .whenComplete(
diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java 
b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
index 1a7a0e03d5..ff6daa61e3 100644
--- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
+++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java
@@ -31,8 +31,9 @@ import java.util.stream.Collectors;
 import org.apache.http.HttpHeaders;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.BaseTransaction;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Scan;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.TableScan;
 import org.apache.iceberg.Transaction;
 import org.apache.iceberg.Transactions;
 import org.apache.iceberg.catalog.Catalog;
@@ -588,7 +589,7 @@ public class RESTCatalogAdapter extends BaseHTTPClient {
       return 100;
     }
 
-    default boolean shouldPlanTableScanAsync(TableScan tableScan) {
+    default boolean shouldPlanTableScanAsync(Scan<?, FileScanTask, ?> scan) {
       return false;
     }
   }

Reply via email to