This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 39ed7e4453 Core: Track & close FileIO used for remote scan planning 
(#15439)
39ed7e4453 is described below

commit 39ed7e445361b2ff83887795e7e8d9f92ed45abc
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Mar 3 09:50:17 2026 +0100

    Core: Track & close FileIO used for remote scan planning (#15439)
---
 .../org/apache/iceberg/rest/RESTTableScan.java     | 50 ++++++++++++++++------
 1 file changed, 38 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java 
b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
index 460c6896e9..706d83817b 100644
--- a/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
+++ b/core/src/main/java/org/apache/iceberg/rest/RESTTableScan.java
@@ -18,6 +18,9 @@
  */
 package org.apache.iceberg.rest;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
 import dev.failsafe.Failsafe;
 import dev.failsafe.FailsafeException;
 import dev.failsafe.RetryPolicy;
@@ -58,6 +61,17 @@ class RESTTableScan extends DataTableScan {
   private static final long MAX_WAIT_TIME_MS = 5 * 60 * 1000; // Total maximum 
duration (5 minutes)
   private static final double SCALE_FACTOR = 2.0; // Exponential scale factor
   private static final String DEFAULT_FILE_IO_IMPL = 
"org.apache.iceberg.io.ResolvingFileIO";
+  private static final Cache<RESTTableScan, FileIO> FILEIO_TRACKER =
+      Caffeine.newBuilder()
+          .weakKeys()
+          .removalListener(
+              (RemovalListener<RESTTableScan, FileIO>)
+                  (scan, io, cause) -> {
+                    if (null != io) {
+                      io.close();
+                    }
+                  })
+          .build();
 
   private final RESTClient client;
   private final Map<String, String> headers;
@@ -199,16 +213,19 @@ class RESTTableScan extends DataTableScan {
   }
 
   private FileIO fileIOForPlanId(List<Credential> storageCredentials) {
-    return CatalogUtil.loadFileIO(
-        catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, 
DEFAULT_FILE_IO_IMPL),
-        ImmutableMap.<String, String>builder()
-            .putAll(catalogProperties)
-            .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
-            .buildKeepingLast(),
-        hadoopConf,
-        storageCredentials.stream()
-            .map(c -> StorageCredential.create(c.prefix(), c.config()))
-            .collect(Collectors.toList()));
+    FileIO ioForScan =
+        CatalogUtil.loadFileIO(
+            catalogProperties.getOrDefault(CatalogProperties.FILE_IO_IMPL, 
DEFAULT_FILE_IO_IMPL),
+            ImmutableMap.<String, String>builder()
+                .putAll(catalogProperties)
+                .put(RESTCatalogProperties.REST_SCAN_PLAN_ID, planId)
+                .buildKeepingLast(),
+            hadoopConf,
+            storageCredentials.stream()
+                .map(c -> StorageCredential.create(c.prefix(), c.config()))
+                .collect(Collectors.toList()));
+    FILEIO_TRACKER.put(this, ioForScan);
+    return ioForScan;
   }
 
   private CloseableIterable<FileScanTask> fetchPlanningResult() {
@@ -236,7 +253,7 @@ class RESTTableScan extends DataTableScan {
                       "Polling for plan {} failed due to: {}",
                       planId,
                       e.getException().getMessage());
-                  cancelPlan();
+                  cleanupPlanResources();
                 })
             .build();
 
@@ -271,7 +288,7 @@ class RESTTableScan extends DataTableScan {
     } catch (Exception e) {
       // Catch any immediate non-retryable exceptions (e.g., I/O errors, auth 
errors)
       try {
-        cancelPlan();
+        cleanupPlanResources();
       } catch (Exception cancelException) {
         // Ignore cancellation failures during exception handling
         e.addSuppressed(cancelException);
@@ -299,6 +316,15 @@ class RESTTableScan extends DataTableScan {
         this::cancelPlan);
   }
 
+  /** Cancels the plan on the server (if supported) and closes the plan-scoped 
FileIO */
+  private void cleanupPlanResources() {
+    cancelPlan();
+    if (null != fileIOForPlanId) {
+      FILEIO_TRACKER.invalidate(this);
+      this.fileIOForPlanId = null;
+    }
+  }
+
   @VisibleForTesting
   @SuppressWarnings("checkstyle:RegexpMultiline")
   public boolean cancelPlan() {

Reply via email to