mengw15 opened a new issue, #5143:
URL: https://github.com/apache/texera/issues/5143

   ### What happened?
   
   
[`IcebergUtil.readDataFileAsIterator`](https://github.com/apache/texera/blob/a820f6727/common/workflow-core/src/main/scala/org/apache/amber/util/IcebergUtil.scala#L414-L433)
 builds a `CloseableIterable[Record]` from `Parquet.read(...).build()` but 
returns only `.iterator().asScala`:
   
   ```scala
   def readDataFileAsIterator(...): Iterator[Record] = {
     ...
     val closeableIterable: CloseableIterable[Record] =
       
Parquet.read(inputFile).project(schema).createReaderFunc(readerFunc).build()
     closeableIterable.iterator().asScala     // ← parent ref dropped
   }
   ```
   
   The `closeableIterable` local goes out of scope after the method returns; 
the caller receives a bare `scala.collection.Iterator[Record]` with no 
`close()` and no reference to its parent — the underlying Parquet reader / 
`S3InputStream` can never be released.
   
   **Symptom chain under `S3FileIO`:**
   
   1. Every read leaks one `S3InputStream` (it stays open until GC because 
nothing in the call graph can close it).
   2. Iceberg-AWS 
[`S3InputStream.finalize()`](https://github.com/apache/iceberg/blob/apache-iceberg-1.7.1/aws/src/main/java/org/apache/iceberg/aws/s3/S3InputStream.java#L296)
 logs `WARN  S3InputStream - Unclosed input stream created by: ...` when the 
GC'd stream is finalized — flooding the amber stdout.
   3. The leaked stream had already borrowed one slot from the AWS SDK's 
`ApacheHttpClient` connection pool (default **50** — Texera does not override; 
`grep -rn maxConnections` across `common/`, `amber/`, 
`computing-unit-managing-service/` returns zero hits).
   4. After ~50 leaked reads the pool saturates; new S3 reads block 
indefinitely on `acquireConnection`. **JVM restart is the only known recovery.**
   5. `IcebergCatalogInstance` 
([`IcebergCatalogInstance.scala`](https://github.com/apache/texera/blob/a820f6727/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/IcebergCatalogInstance.scala))
 is a process-lifetime singleton and is never recreated, so leaked pool slots 
accumulate over the JVM lifetime.
   6. `ComputingUnitManagingResource @DELETE /{cuid}/terminate` only deletes 
the K8s pod and writes a DB timestamp — in-flight Iceberg readers are not 
closed and the catalog / `S3FileIO` is untouched. Frequent CU delete/recreate 
cycles amplify the leak.
   
   The same anti-pattern recurs in 
[`IcebergDocument.scala`](https://github.com/apache/texera/blob/a820f6727/common/workflow-core/src/main/scala/org/apache/amber/core/storage/result/iceberg/IcebergDocument.scala)
 at lines 128, 207-219, 339, 462 — `table.newScan().planFiles().iterator()` 
similarly drops the `CloseableIterable<FileScanTask>`.
   
   **Fix sketch:**
   - Return an `AutoCloseable` wrapper that owns both the iterator and the 
parent `CloseableIterable`; expose `close()` to the caller.
   - Propagate the close handle through `IcebergDocument`'s public read methods 
(`Using.resource(...)` or equivalent at every call site).
   - Optionally set `s3.connection-pool-max` on the catalog properties as a 
defense in depth.
   
   ### How to reproduce?
   
   Add this ScalaTest spec at 
`common/workflow-core/src/test/scala/org/apache/amber/util/IcebergUtilLeakSpec.scala`:
   
   ```scala
   package org.apache.amber.util
   
   import org.apache.iceberg.io.{CloseableIterable, CloseableIterator}
   import org.scalatest.flatspec.AnyFlatSpec
   import java.util.concurrent.atomic.AtomicInteger
   import scala.jdk.CollectionConverters._
   
   class IcebergUtilLeakSpec extends AnyFlatSpec {
   
     it should "close the parent CloseableIterable after the iterator is 
consumed" in {
       val parentCloseCount = new AtomicInteger(0)
   
       val ci: CloseableIterable[Int] = new CloseableIterable[Int] {
         override def iterator(): CloseableIterator[Int] = {
           val backing = java.util.Arrays.asList(1, 2, 3).iterator()
           new CloseableIterator[Int] {
             override def hasNext: Boolean = backing.hasNext
             override def next(): Int      = backing.next()
             override def close(): Unit    = ()
           }
         }
         override def close(): Unit = parentCloseCount.incrementAndGet()
       }
   
       // Exercise the exact leaky pattern from IcebergUtil.scala:426-432
       val leakyEscape: Iterator[Int] = ci.iterator().asScala
       leakyEscape.toList
   
       assert(parentCloseCount.get() >= 1,
         s"Parent CloseableIterable was never closed (close count = 
${parentCloseCount.get()}).")
     }
   }
   ```
   
   Run:
   ```
   sbt "WorkflowCore/testOnly org.apache.amber.util.IcebergUtilLeakSpec"
   ```
   
   Output on current `master`:
   ```
   [info] IcebergUtilLeakSpec:
   [info] - should close the parent CloseableIterable after the iterator is 
consumed *** FAILED ***
   [info]   0 was not greater than or equal to 1 Parent CloseableIterable was 
never closed (close count = 0).
   [info] *** 1 TEST FAILED ***
   ```
   
   The test exercises the exact anti-pattern 
(`CloseableIterable.iterator().asScala` with the parent reference dropped) 
without needing real Parquet / S3, so it reproduces the structural defect 
deterministically. The user-visible `WARN S3InputStream - Unclosed input stream 
created by: ...` flood and the eventual stall come from running the buggy 
method under `S3FileIO` repeatedly until the bounded Apache HTTP pool saturates.
   
   ### Version
   
   1.1.0-incubating (Pre-release/Master)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to