This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch close-planfiles-iterable
in repository https://gitbox.apache.org/repos/asf/iceberg.git

commit 1270b7270020706efdc0aa84589b9b50bbc36f4e
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Fri Dec 19 12:18:15 2025 +0100

    Core: Close planFiles() iterable
    
    Noticed the below stack trace when running `TestRemoteScanPlanning`:
    
    ```
    [Finalizer] WARN org.apache.iceberg.hadoop.HadoopStreams - Unclosed input 
stream created by:
            
org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.<init>(HadoopStreams.java:90)
            org.apache.iceberg.hadoop.HadoopStreams.wrap(HadoopStreams.java:54)
            
org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183)
            
org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:103)
            org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:78)
            
org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
            
org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
            
org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
            
org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
            
org.apache.iceberg.io.CloseableIterable.lambda$filter$0(CloseableIterable.java:126)
            
org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:89)
            
org.apache.iceberg.io.CloseableIterable.lambda$filter$1(CloseableIterable.java:153)
            
org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:89)
            
org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
            
org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
            org.apache.iceberg.ManifestGroup$1.iterator(ManifestGroup.java:347)
            
org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.hasNext(CloseableIterable.java:274)
            
org.apache.iceberg.rest.CatalogHandlers.planFilesFor(CatalogHandlers.java:813)
            
org.apache.iceberg.rest.CatalogHandlers.planTableScan(CatalogHandlers.java:690)
            
org.apache.iceberg.rest.RESTCatalogAdapter.handleRequest(RESTCatalogAdapter.java:321)
            
org.apache.iceberg.rest.RESTServerCatalogAdapter.handleRequest(RESTServerCatalogAdapter.java:47)
            
org.apache.iceberg.rest.RESTCatalogAdapter.execute(RESTCatalogAdapter.java:569)
            
org.apache.iceberg.rest.RESTCatalogAdapter.execute(RESTCatalogAdapter.java:550)
            
org.apache.iceberg.rest.RESTCatalogServlet.execute(RESTCatalogServlet.java:110)
            
org.apache.iceberg.rest.RESTCatalogServlet.doPost(RESTCatalogServlet.java:79)
            jakarta.servlet.http.HttpServlet.service(HttpServlet.java:520)
            jakarta.servlet.http.HttpServlet.service(HttpServlet.java:587)
    ```
---
 .../org/apache/iceberg/rest/CatalogHandlers.java   | 57 ++++++++++++----------
 1 file changed, 31 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java 
b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
index e3eff1228a..229497576a 100644
--- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
+++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
@@ -23,6 +23,7 @@ import static 
org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAUL
 import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT;
 import static 
org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;
 
+import java.io.IOException;
 import java.time.OffsetDateTime;
 import java.time.ZoneOffset;
 import java.util.Collections;
@@ -64,6 +65,7 @@ import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.NoSuchViewException;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -805,35 +807,38 @@ public class CatalogHandlers {
    */
   private static Pair<List<FileScanTask>, String> planFilesFor(
       Scan<?, FileScanTask, ?> scan, String planId, String tableId, int 
tasksPerPlanTask) {
-    Iterable<FileScanTask> planTasks = scan.planFiles();
-    String planTaskPrefix = planId + "-" + tableId + "-";
-
-    // Handle empty table scans
-    if (!planTasks.iterator().hasNext()) {
-      String planTaskKey = planTaskPrefix + "0";
-      // Add empty scan to planning state so async calls know the scan 
completed
-      IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, 
Collections.emptyList());
-      return Pair.of(Collections.emptyList(), planTaskKey);
-    }
-
-    Iterable<List<FileScanTask>> taskGroupings = 
Iterables.partition(planTasks, tasksPerPlanTask);
-    int planTaskSequence = 0;
-    String previousPlanTask = null;
-    String firstPlanTaskKey = null;
-    List<FileScanTask> initialFileScanTasks = null;
-    for (List<FileScanTask> taskGrouping : taskGroupings) {
-      String planTaskKey = planTaskPrefix + planTaskSequence++;
-      IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
-      if (previousPlanTask != null) {
-        IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, 
planTaskKey);
-      } else {
-        firstPlanTaskKey = planTaskKey;
-        initialFileScanTasks = taskGrouping;
+    try (CloseableIterable<FileScanTask> planTasks = scan.planFiles()) {
+      String planTaskPrefix = planId + "-" + tableId + "-";
+
+      // Handle empty table scans
+      if (!planTasks.iterator().hasNext()) {
+        String planTaskKey = planTaskPrefix + "0";
+        // Add empty scan to planning state so async calls know the scan 
completed
+        IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, 
Collections.emptyList());
+        return Pair.of(Collections.emptyList(), planTaskKey);
       }
 
-      previousPlanTask = planTaskKey;
+      Iterable<List<FileScanTask>> taskGroupings = 
Iterables.partition(planTasks, tasksPerPlanTask);
+      int planTaskSequence = 0;
+      String previousPlanTask = null;
+      String firstPlanTaskKey = null;
+      List<FileScanTask> initialFileScanTasks = null;
+      for (List<FileScanTask> taskGrouping : taskGroupings) {
+        String planTaskKey = planTaskPrefix + planTaskSequence++;
+        IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping);
+        if (previousPlanTask != null) {
+          IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, 
planTaskKey);
+        } else {
+          firstPlanTaskKey = planTaskKey;
+          initialFileScanTasks = taskGrouping;
+        }
+
+        previousPlanTask = planTaskKey;
+      }
+      return Pair.of(initialFileScanTasks, firstPlanTaskKey);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
     }
-    return Pair.of(initialFileScanTasks, firstPlanTaskKey);
   }
 
   @SuppressWarnings("FutureReturnValueIgnored")

Reply via email to