mengw15 opened a new issue, #5143: URL: https://github.com/apache/texera/issues/5143
### What happened? [`IcebergUtil.readDataFileAsIterator`](https://github.com/apache/texera/blob/a820f6727/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala#L414-L433) builds a `CloseableIterable[Record]` from `Parquet.read(...).build()` but returns only `.iterator().asScala`: ```scala def readDataFileAsIterator(...): Iterator[Record] = { ... val closeableIterable: CloseableIterable[Record] = Parquet.read(inputFile).project(schema).createReaderFunc(readerFunc).build() closeableIterable.iterator().asScala // ← parent ref dropped } ``` The `closeableIterable` local goes out of scope after the method returns; the caller receives a bare `scala.collection.Iterator[Record]` with no `close()` and no reference to its parent — the underlying Parquet reader / `S3InputStream` can never be released. **Symptom chain under `S3FileIO`:** 1. Every read leaks one `S3InputStream` (it stays open until GC because nothing in the call graph can close it). 2. Iceberg-AWS [`S3InputStream.finalize()`](https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java#L296) logs `WARN S3InputStream - Unclosed input stream created by: ...` when the GC'd stream is finalized — flooding the amber stdout. 3. The leaked stream had already borrowed one slot from the AWS SDK's `ApacheHttpClient` connection pool (default **50** — Texera does not override; `grep -rn maxConnections` across `common/`, `amber/`, `computing-unit-managing-service/` returns zero hits). 4. After ~50 leaked reads the pool saturates; new S3 reads block indefinitely on `acquireConnection`. **JVM restart is the only known recovery.** 5. `IcebergCatalogInstance` ([`IcebergCatalogInstance.scala`](https://github.com/apache/texera/blob/a820f6727/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala)) is a process-lifetime singleton and is never recreated, so leaked pool slots accumulate over the JVM lifetime. 6. `ComputingUnitManagingResource @DELETE /{cuid}/terminate` only deletes the K8s pod and writes a DB timestamp — in-flight Iceberg readers are not closed and the catalog / `S3FileIO` is untouched. Frequent CU delete/recreate cycles amplify the leak. The same anti-pattern recurs in [`IcebergDocument.scala`](https://github.com/apache/texera/blob/a820f6727/common/workflow-core/src/main/scala/org/apache/amber/core/storage/result/iceberg/IcebergDocument.scala) at lines 128, 207-219, 339, 462 — `table.newScan().planFiles().iterator()` similarly drops the `CloseableIterable<FileScanTask>`. **Fix sketch:** - Return an `AutoCloseable` wrapper that owns both the iterator and the parent `CloseableIterable`; expose `close()` to the caller. - Propagate the close handle through `IcebergDocument`'s public read methods (`Using.resource(...)` or equivalent at every call site). - Optionally set `s3.connection-pool-max` on the catalog properties as a defense in depth. ### How to reproduce? Add this ScalaTest spec at `common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilLeakSpec.scala`: ```scala package org.apache.amber.util import org.apache.iceberg.io.{CloseableIterable, CloseableIterator} import org.scalatest.flatspec.AnyFlatSpec import java.util.concurrent.atomic.AtomicInteger import scala.jdk.CollectionConverters._ class IcebergUtilLeakSpec extends AnyFlatSpec { it should "close the parent CloseableIterable after the iterator is consumed" in { val parentCloseCount = new AtomicInteger(0) val ci: CloseableIterable[Int] = new CloseableIterable[Int] { override def iterator(): CloseableIterator[Int] = { val backing = java.util.Arrays.asList(1, 2, 3).iterator() new CloseableIterator[Int] { override def hasNext: Boolean = backing.hasNext override def next(): Int = backing.next() override def close(): Unit = () } } override def close(): Unit = parentCloseCount.incrementAndGet() } // Exercise the exact leaky pattern from IcebergUtil.scala:426-432 val leakyEscape: Iterator[Int] = ci.iterator().asScala leakyEscape.toList assert(parentCloseCount.get() >= 1, s"Parent CloseableIterable was never closed (close count = ${parentCloseCount.get()}).") } } ``` Run: ``` sbt "WorkflowCore/testOnly org.apache.amber.util.IcebergUtilLeakSpec" ``` Output on current `master`: ``` [info] IcebergUtilLeakSpec: [info] - should close the parent CloseableIterable after the iterator is consumed *** FAILED *** [info] 0 was not greater than or equal to 1 Parent CloseableIterable was never closed (close count = 0). [info] *** 1 TEST FAILED *** ``` The test exercises the exact anti-pattern (`CloseableIterable.iterator().asScala` with the parent reference dropped) without needing real Parquet / S3, so it reproduces the structural defect deterministically. The user-visible `WARN S3InputStream - Unclosed input stream created by: ...` flood and the eventual stall come from running the buggy method under `S3FileIO` repeatedly until the bounded Apache HTTP pool saturates. ### Version 1.1.0-incubating (Pre-release/Master) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
