This is an automated email from the ASF dual-hosted git repository. etudenhoefner pushed a commit to branch close-planfiles-iterable in repository https://gitbox.apache.org/repos/asf/iceberg.git
commit 1270b7270020706efdc0aa84589b9b50bbc36f4e Author: Eduard Tudenhoefner <[email protected]> AuthorDate: Fri Dec 19 12:18:15 2025 +0100 Core: Close planFiles() iterable Noticed the below stack trace when running `TestRemoteScanPlanning`: ``` [Finalizer] WARN org.apache.iceberg.hadoop.HadoopStreams - Unclosed input stream created by: org.apache.iceberg.hadoop.HadoopStreams$HadoopSeekableInputStream.<init>(HadoopStreams.java:90) org.apache.iceberg.hadoop.HadoopStreams.wrap(HadoopStreams.java:54) org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183) org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:103) org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:78) org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205) org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204) org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205) org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204) org.apache.iceberg.io.CloseableIterable.lambda$filter$0(CloseableIterable.java:126) org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:89) org.apache.iceberg.io.CloseableIterable.lambda$filter$1(CloseableIterable.java:153) org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:89) org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205) org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204) org.apache.iceberg.ManifestGroup$1.iterator(ManifestGroup.java:347) org.apache.iceberg.io.CloseableIterable$ConcatCloseableIterable$ConcatCloseableIterator.hasNext(CloseableIterable.java:274) org.apache.iceberg.rest.CatalogHandlers.planFilesFor(CatalogHandlers.java:813) org.apache.iceberg.rest.CatalogHandlers.planTableScan(CatalogHandlers.java:690) org.apache.iceberg.rest.RESTCatalogAdapter.handleRequest(RESTCatalogAdapter.java:321) org.apache.iceberg.rest.RESTServerCatalogAdapter.handleRequest(RESTServerCatalogAdapter.java:47) org.apache.iceberg.rest.RESTCatalogAdapter.execute(RESTCatalogAdapter.java:569) org.apache.iceberg.rest.RESTCatalogAdapter.execute(RESTCatalogAdapter.java:550) org.apache.iceberg.rest.RESTCatalogServlet.execute(RESTCatalogServlet.java:110) org.apache.iceberg.rest.RESTCatalogServlet.doPost(RESTCatalogServlet.java:79) jakarta.servlet.http.HttpServlet.service(HttpServlet.java:520) jakarta.servlet.http.HttpServlet.service(HttpServlet.java:587) ``` --- .../org/apache/iceberg/rest/CatalogHandlers.java | 57 ++++++++++++---------- 1 file changed, 31 insertions(+), 26 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index e3eff1228a..229497576a 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -23,6 +23,7 @@ import static org.apache.iceberg.TableProperties.COMMIT_MIN_RETRY_WAIT_MS_DEFAUL import static org.apache.iceberg.TableProperties.COMMIT_NUM_RETRIES_DEFAULT; import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; +import java.io.IOException; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Collections; @@ -64,6 +65,7 @@ import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.NoSuchNamespaceException; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; +import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -805,35 +807,38 @@ public class CatalogHandlers { */ private static Pair<List<FileScanTask>, String> planFilesFor( Scan<?, FileScanTask, ?> scan, String planId, String tableId, int tasksPerPlanTask) { - Iterable<FileScanTask> planTasks = scan.planFiles(); - String planTaskPrefix = planId + "-" + tableId + "-"; - - // Handle empty table scans - if (!planTasks.iterator().hasNext()) { - String planTaskKey = planTaskPrefix + "0"; - // Add empty scan to planning state so async calls know the scan completed - IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, Collections.emptyList()); - return Pair.of(Collections.emptyList(), planTaskKey); - } - - Iterable<List<FileScanTask>> taskGroupings = Iterables.partition(planTasks, tasksPerPlanTask); - int planTaskSequence = 0; - String previousPlanTask = null; - String firstPlanTaskKey = null; - List<FileScanTask> initialFileScanTasks = null; - for (List<FileScanTask> taskGrouping : taskGroupings) { - String planTaskKey = planTaskPrefix + planTaskSequence++; - IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping); - if (previousPlanTask != null) { - IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, planTaskKey); - } else { - firstPlanTaskKey = planTaskKey; - initialFileScanTasks = taskGrouping; + try (CloseableIterable<FileScanTask> planTasks = scan.planFiles()) { + String planTaskPrefix = planId + "-" + tableId + "-"; + + // Handle empty table scans + if (!planTasks.iterator().hasNext()) { + String planTaskKey = planTaskPrefix + "0"; + // Add empty scan to planning state so async calls know the scan completed + IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, Collections.emptyList()); + return Pair.of(Collections.emptyList(), planTaskKey); } - previousPlanTask = planTaskKey; + Iterable<List<FileScanTask>> taskGroupings = Iterables.partition(planTasks, tasksPerPlanTask); + int planTaskSequence = 0; + String previousPlanTask = null; + String firstPlanTaskKey = null; + List<FileScanTask> initialFileScanTasks = null; + for (List<FileScanTask> taskGrouping : taskGroupings) { + String planTaskKey = planTaskPrefix + planTaskSequence++; + IN_MEMORY_PLANNING_STATE.addPlanTask(planTaskKey, taskGrouping); + if (previousPlanTask != null) { + IN_MEMORY_PLANNING_STATE.addNextPlanTask(previousPlanTask, planTaskKey); + } else { + firstPlanTaskKey = planTaskKey; + initialFileScanTasks = taskGrouping; + } + + previousPlanTask = planTaskKey; + } + return Pair.of(initialFileScanTasks, firstPlanTaskKey); + } catch (IOException e) { + throw new RuntimeException(e); } - return Pair.of(initialFileScanTasks, firstPlanTaskKey); } @SuppressWarnings("FutureReturnValueIgnored")
