This is an automated email from the ASF dual-hosted git repository.
github-merge-queue[bot] pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/texera.git
The following commit(s) were added to refs/heads/main by this push:
new 5e56956860 fix: close CloseableIterable owners in Iceberg read paths
(#5149)
5e56956860 is described below
commit 5e569568606a204070040b1521210cc9d853bc10
Author: Meng Wang <[email protected]>
AuthorDate: Sat May 23 00:04:20 2026 -0700
fix: close CloseableIterable owners in Iceberg read paths (#5149)
### What changes were proposed in this PR?
Fixes a resource leak in `IcebergUtil.readDataFileAsIterator` and five
sibling sites in `IcebergDocument` that share the same anti-pattern:
```scala
closeableIterable.iterator().asScala
```
The bare `Iterator` returned to callers held no reference to its parent
`CloseableIterable`, so the parent could never be closed. Under
`S3FileIO`:
1. Every read leaked one `S3InputStream` (kept open until GC because
nothing in the call graph could close it).
2. The leaked stream had already borrowed one slot from the AWS SDK's
`ApacheHttpClient` connection pool (default **50**; texera did not
override).
3. After ~50 leaked reads the pool may saturate; new S3 reads then block
on `acquireConnection` until JVM restart.
This PR:
- Introduces `CloseableScalaIterator[T]` (`Iterator[T] with
AutoCloseable`, idempotent `close()`) in `IcebergUtil`, which wraps a
`CloseableIterable[T]` and propagates `close()` to the parent.
- Changes `IcebergUtil.readDataFileAsIterator` to return
`CloseableScalaIterator[Record]` instead of bare `Iterator[Record]`.
Callers must now close it (e.g. via `Using.resource`).
- Updates the single caller in `IcebergDocument`'s read iterator to
track the close handle in a sibling `AutoCloseable` field
(`currentRecordIteratorCloser`) and close it on file-switch, on
exhaustion, and on caller-imposed `until` cap. The sibling field is
necessary because `Iterator.drop(n)` returns a bare iterator that loses
the wrapper type.
- Wraps the four eagerly-consumed `planFiles()` call sites — `getCount`,
`seekToUsableFile`, `getTableStatistics`, `asInputStream` — in
`Using.resource` so the metadata-side `CloseableIterable<FileScanTask>`
is closed promptly.
**Known limitation (out of scope here):** if a caller of
`IcebergDocument.get()` / `getRange()` / `getAfter()` stops iterating
before `hasNext` returns `false` (e.g. throws mid-loop, or calls
`.take(n)` and then drops the result), the LAST file's
`CloseableScalaIterator` will leak until JVM GC. Fixing this requires
changing the public `Iterator[T]` return type on `VirtualDocument` to
`Iterator[T] with AutoCloseable` and updating all callers — best done as
a separate refactor.
### Any related issues, documentation, discussions?
Closes #5143.
### How was this PR tested?
- Added `IcebergUtilLeakSpec` (2 cases): validates that
`CloseableScalaIterator` (a) closes its parent `CloseableIterable` when
used inside `Using.resource`, and (b) is idempotent under repeated
`close()` calls.
- All existing iceberg specs still pass:
- `IcebergUtilSpec`: 14/14
- `IcebergUtilLeakSpec`: 2/2 (new)
- `IcebergDocumentSpec`: 18/18 (exercises the modified read iterator's
close-on-reassign / close-on-exhaustion paths against real Iceberg
infrastructure)
- `IcebergTableStatsSpec`: 12/12 (exercises `getTableStatistics` with
the new `Using.resource` wrap)
- `IcebergDocumentConsoleMessagesSpec`: 1/1
Run locally:
```
sbt "WorkflowCore/testOnly org.apache.texera.amber.util.IcebergUtilSpec
org.apache.texera.amber.util.IcebergUtilLeakSpec
org.apache.texera.amber.storage.result.iceberg.*"
```
Result: `47 succeeded, 0 failed`.
### Was this PR authored or co-authored using generative AI tooling?
Generated-by: Claude Code (claude-opus-4-7)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../storage/result/iceberg/IcebergDocument.scala | 245 ++++++++++++---------
.../org/apache/texera/amber/util/IcebergUtil.scala | 27 ++-
.../result/iceberg/IcebergDocumentSpec.scala | 21 ++
3 files changed, 175 insertions(+), 118 deletions(-)
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
index e10152cdae..182da2baac 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala
@@ -125,7 +125,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
.getOrElse(
return 0
)
- table.newScan().planFiles().iterator().asScala.map(f =>
f.file().recordCount()).sum
+ Using.resource(table.newScan().planFiles()) { tasks =>
+ tasks.iterator().asScala.map(f => f.file().recordCount()).sum
+ }
}
/**
@@ -178,8 +180,11 @@ private[storage] class IcebergDocument[T >: Null <:
AnyRef](
// Iterator for usable file scan tasks
private var usableFileIterator: Iterator[FileScanTask] =
seekToUsableFile()
- // Current record iterator for the active file
+ // Active file's records. Closer tracked separately because the
+ // `.drop` call below returns a bare `Iterator[Record]` that loses
+ // the wrapper type.
private var currentRecordIterator: Iterator[Record] = Iterator.empty
+ private var currentRecordIteratorCloser: AutoCloseable = () => ()
// Util function to load the table's metadata
private def loadTableMetadata(): Option[Table] = {
@@ -202,38 +207,43 @@ private[storage] class IcebergDocument[T >: Null <:
AnyRef](
}
table.foreach(_.refresh())
- // Retrieve and sort the file scan tasks by file sequence number
+ // Retrieve and sort the file scan tasks by file sequence number.
+ // Materialize inside `Using.resource` so the `planFiles()`
+ // CloseableIterable is released after collection.
val fileScanTasksIterator: Iterator[FileScanTask] = table match {
case Some(t) =>
val currentSnapshotId =
Option(t.currentSnapshot()).map(_.snapshotId())
- val fileScanTasks = (lastSnapshotId, currentSnapshotId) match {
- // Read from the start
- case (None, Some(_)) =>
- val tasks = t.newScan().planFiles().iterator().asScala
- lastSnapshotId = currentSnapshotId
- tasks
-
- // Read incrementally from the last snapshot
- case (Some(lastId), Some(currId)) if lastId != currId =>
- val tasks = t
- .newIncrementalAppendScan()
- .fromSnapshotExclusive(lastId)
- .toSnapshot(currId)
- .planFiles()
- .iterator()
- .asScala
- lastSnapshotId = currentSnapshotId
- tasks
-
- // No new data
- case (Some(lastId), Some(currId)) if lastId == currId =>
- Iterator.empty
-
- // Default: No data yet
- case _ =>
- Iterator.empty
- }
-
fileScanTasks.toSeq.sortBy(_.file().fileSequenceNumber()).iterator
+ val fileScanTasks: Seq[FileScanTask] =
+ (lastSnapshotId, currentSnapshotId) match {
+ // Read from the start
+ case (None, Some(_)) =>
+ val tasks = Using.resource(t.newScan().planFiles()) { ci
=>
+ ci.iterator().asScala.toSeq
+ }
+ lastSnapshotId = currentSnapshotId
+ tasks
+
+ // Read incrementally from the last snapshot
+ case (Some(lastId), Some(currId)) if lastId != currId =>
+ val tasks = Using.resource(
+ t
+ .newIncrementalAppendScan()
+ .fromSnapshotExclusive(lastId)
+ .toSnapshot(currId)
+ .planFiles()
+ ) { ci => ci.iterator().asScala.toSeq }
+ lastSnapshotId = currentSnapshotId
+ tasks
+
+ // No new data
+ case (Some(lastId), Some(currId)) if lastId == currId =>
+ Seq.empty
+
+ // Default: No data yet
+ case _ =>
+ Seq.empty
+ }
+ fileScanTasks.sortBy(_.file().fileSequenceNumber()).iterator
case None =>
Iterator.empty
@@ -255,6 +265,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
override def hasNext: Boolean = {
if (numOfReturnedRecords >= totalRecordsToReturn) {
+ // Caller-imposed limit reached; release the active file's reader.
+ currentRecordIteratorCloser.close()
+ currentRecordIteratorCloser = () => ()
return false
}
@@ -274,11 +287,15 @@ private[storage] class IcebergDocument[T >: Null <:
AnyRef](
case Some(cols) => tableSchema.select(cols.asJava)
case None => tableSchema
}
- currentRecordIterator = IcebergUtil.readDataFileAsIterator(
+ // Release the prior file's reader before opening the next.
+ currentRecordIteratorCloser.close()
+ val nextIter = IcebergUtil.readDataFileAsIterator(
nextFile.file(),
schemaToUse,
table.get
)
+ currentRecordIteratorCloser = nextIter
+ currentRecordIterator = nextIter.asScala
// Skip records within the file if necessary
val recordsToSkipInFile = from - numOfSkippedRecords
@@ -288,7 +305,13 @@ private[storage] class IcebergDocument[T >: Null <:
AnyRef](
}
}
- currentRecordIterator.hasNext
+ val hasMore = currentRecordIterator.hasNext
+ if (!hasMore) {
+ // All files exhausted; release the last file's reader.
+ currentRecordIteratorCloser.close()
+ currentRecordIteratorCloser = () => ()
+ }
+ hasMore
}
override def next(): T = {
@@ -355,83 +378,89 @@ private[storage] class IcebergDocument[T >: Null <:
AnyRef](
}
// Scan table files and aggregate statistics
-
table.newScan().includeColumnStats().planFiles().iterator().asScala.foreach {
file =>
- val fileStats = file.file()
- // Extract column-level statistics
- val lowerBounds =
- Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer,
ByteBuffer].asJava)
- val upperBounds =
- Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer,
ByteBuffer].asJava)
- val nullCounts =
- Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer,
java.lang.Long].asJava)
- val nanCounts =
- Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer,
java.lang.Long].asJava)
-
- fieldTypes.foreach {
- case (field, (fieldId, fieldType)) =>
- val lowerBound = Option(lowerBounds.get(fieldId))
- val upperBound = Option(upperBounds.get(fieldId))
- val nullCount: Long =
Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
- val nanCount: Long =
Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
- val fieldStat = fieldStats(field)
-
- // Process min/max values for numerical types
- if (
- fieldType == Types.IntegerType.get() || fieldType == Types.LongType
- .get() || fieldType == Types.DoubleType.get()
- ) {
- lowerBound.foreach { buffer =>
- val minValue =
- Conversions.fromByteBuffer(fieldType,
buffer).asInstanceOf[Number].doubleValue()
- fieldStat("min") =
Math.min(fieldStat("min").asInstanceOf[Double], minValue)
- }
+ Using.resource(table.newScan().includeColumnStats().planFiles()) { tasks =>
+ tasks.iterator().asScala.foreach { file =>
+ val fileStats = file.file()
+ // Extract column-level statistics
+ val lowerBounds =
+ Option(fileStats.lowerBounds()).getOrElse(Map.empty[Integer,
ByteBuffer].asJava)
+ val upperBounds =
+ Option(fileStats.upperBounds()).getOrElse(Map.empty[Integer,
ByteBuffer].asJava)
+ val nullCounts =
+ Option(fileStats.nullValueCounts()).getOrElse(Map.empty[Integer,
java.lang.Long].asJava)
+ val nanCounts =
+ Option(fileStats.nanValueCounts()).getOrElse(Map.empty[Integer,
java.lang.Long].asJava)
+
+ fieldTypes.foreach {
+ case (field, (fieldId, fieldType)) =>
+ val lowerBound = Option(lowerBounds.get(fieldId))
+ val upperBound = Option(upperBounds.get(fieldId))
+ val nullCount: Long =
Option(nullCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
+ val nanCount: Long =
Option(nanCounts.get(fieldId)).map(_.toLong).getOrElse(0L)
+ val fieldStat = fieldStats(field)
+
+ // Process min/max values for numerical types
+ if (
+ fieldType == Types.IntegerType.get() || fieldType ==
Types.LongType
+ .get() || fieldType == Types.DoubleType.get()
+ ) {
+ lowerBound.foreach { buffer =>
+ val minValue =
+ Conversions.fromByteBuffer(fieldType,
buffer).asInstanceOf[Number].doubleValue()
+ fieldStat("min") =
Math.min(fieldStat("min").asInstanceOf[Double], minValue)
+ }
- upperBound.foreach { buffer =>
- val maxValue =
- Conversions.fromByteBuffer(fieldType,
buffer).asInstanceOf[Number].doubleValue()
- fieldStat("max") =
Math.max(fieldStat("max").asInstanceOf[Double], maxValue)
- }
- }
- // Process min/max values for timestamp types
- else if (
- fieldType == Types.TimestampType.withoutZone() || fieldType ==
Types.TimestampType
- .withZone()
- ) {
- lowerBound.foreach { buffer =>
- val epochMicros = Conversions
- .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
- .asInstanceOf[Long]
- val dateValue =
- Instant.ofEpochMilli(epochMicros /
1000).atZone(ZoneOffset.UTC).toLocalDate
- fieldStat("min") =
- if (
- dateValue
-
.isBefore(LocalDate.parse(fieldStat("min").asInstanceOf[String], dateFormatter))
- )
- dateValue.format(dateFormatter)
- else
- fieldStat("min")
+ upperBound.foreach { buffer =>
+ val maxValue =
+ Conversions.fromByteBuffer(fieldType,
buffer).asInstanceOf[Number].doubleValue()
+ fieldStat("max") =
Math.max(fieldStat("max").asInstanceOf[Double], maxValue)
+ }
}
+ // Process min/max values for timestamp types
+ else if (
+ fieldType == Types.TimestampType.withoutZone() || fieldType ==
Types.TimestampType
+ .withZone()
+ ) {
+ lowerBound.foreach { buffer =>
+ val epochMicros = Conversions
+ .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
+ .asInstanceOf[Long]
+ val dateValue =
+ Instant.ofEpochMilli(epochMicros /
1000).atZone(ZoneOffset.UTC).toLocalDate
+ fieldStat("min") =
+ if (
+ dateValue
+ .isBefore(
+ LocalDate.parse(fieldStat("min").asInstanceOf[String],
dateFormatter)
+ )
+ )
+ dateValue.format(dateFormatter)
+ else
+ fieldStat("min")
+ }
- upperBound.foreach { buffer =>
- val epochMicros = Conversions
- .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
- .asInstanceOf[Long]
- val dateValue =
- Instant.ofEpochMilli(epochMicros /
1000).atZone(ZoneOffset.UTC).toLocalDate
- fieldStat("max") =
- if (
- dateValue
-
.isAfter(LocalDate.parse(fieldStat("max").asInstanceOf[String], dateFormatter))
- )
- dateValue.format(dateFormatter)
- else
- fieldStat("max")
+ upperBound.foreach { buffer =>
+ val epochMicros = Conversions
+ .fromByteBuffer(Types.TimestampType.withoutZone(), buffer)
+ .asInstanceOf[Long]
+ val dateValue =
+ Instant.ofEpochMilli(epochMicros /
1000).atZone(ZoneOffset.UTC).toLocalDate
+ fieldStat("max") =
+ if (
+ dateValue
+ .isAfter(
+ LocalDate.parse(fieldStat("max").asInstanceOf[String],
dateFormatter)
+ )
+ )
+ dateValue.format(dateFormatter)
+ else
+ fieldStat("max")
+ }
}
- }
- // Update non-null count
- fieldStat("not_null_count") =
fieldStat("not_null_count").asInstanceOf[Long] +
- (fileStats.recordCount().toLong - nullCount - nanCount)
+ // Update non-null count
+ fieldStat("not_null_count") =
fieldStat("not_null_count").asInstanceOf[Long] +
+ (fileStats.recordCount().toLong - nullCount - nanCount)
+ }
}
}
fieldStats.map {
@@ -478,7 +507,9 @@ private[storage] class IcebergDocument[T >: Null <: AnyRef](
val fileScanTasks: Seq[FileScanTask] = {
val table =
this.catalog.loadTable(TableIdentifier.of(this.tableNamespace, this.tableName))
table.refresh()
- table.newScan().planFiles().iterator().asScala.toSeq
+ Using.resource(table.newScan().planFiles()) { tasks =>
+ tasks.iterator().asScala.toSeq
+ }
}
if (fileScanTasks.isEmpty) {
diff --git
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
index cee293a758..0b45b9eec3 100644
---
a/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
+++
b/common/workflow-core/src/main/scala/org/apache/texera/amber/util/IcebergUtil.scala
@@ -27,7 +27,7 @@ import org.apache.iceberg.data.parquet.GenericParquetReaders
import org.apache.iceberg.data.{GenericRecord, Record}
import org.apache.iceberg.aws.s3.S3FileIO
import org.apache.iceberg.hadoop.{HadoopCatalog, HadoopFileIO}
-import org.apache.iceberg.io.{CloseableIterable, InputFile}
+import org.apache.iceberg.io.{CloseableIterator, InputFile}
import org.apache.iceberg.jdbc.JdbcCatalog
import org.apache.iceberg.parquet.{Parquet, ParquetValueReader}
import org.apache.iceberg.rest.RESTCatalog
@@ -404,17 +404,23 @@ object IcebergUtil {
}
/**
- * Util function to create a Record iterator over the given DataFile in
Iceberg
+ * Returns a Record iterator over the given Iceberg DataFile.
+ *
+ * The returned `CloseableIterator` (Iceberg's iterator type) owns the
+ * underlying Parquet reader / S3InputStream / AWS HTTP-pool slot. The
+ * caller MUST close it once iteration is finished, otherwise those
+ * resources are leaked.
+ *
* @param dataFile the data file
* @param schema the schema of the table
* @param table the iceberg table
- * @return an iterator over the records in the data file
+ * @return a closeable iterator over the records in the data file
*/
def readDataFileAsIterator(
dataFile: DataFile,
schema: IcebergSchema,
table: Table
- ): Iterator[Record] = {
+ ): CloseableIterator[Record] = {
val inputFile: InputFile = table.io().newInputFile(dataFile)
val readerFunc
: java.util.function.Function[org.apache.parquet.schema.MessageType,
ParquetValueReader[
@@ -422,13 +428,12 @@ object IcebergUtil {
]] =
(messageType: org.apache.parquet.schema.MessageType) =>
GenericParquetReaders.buildReader(schema, messageType)
- val closeableIterable: CloseableIterable[Record] =
- Parquet
- .read(inputFile)
- .project(schema)
- .createReaderFunc(readerFunc)
- .build()
- closeableIterable.iterator().asScala
+ Parquet
+ .read(inputFile)
+ .project(schema)
+ .createReaderFunc(readerFunc)
+ .build()
+ .iterator()
}
}
diff --git
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
index 6184ce8dcd..0e9b2ae68a 100644
---
a/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
+++
b/common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala
@@ -282,6 +282,27 @@ class IcebergDocumentSpec extends
VirtualDocumentSpec[Tuple] with BeforeAndAfter
)
}
+ it should "expose written rows as a non-empty ZIP via asInputStream" in {
+ val items = generateSampleItems().take(3)
+ val writer = document.writer(UUID.randomUUID().toString)
+ writer.open()
+ items.foreach(writer.putOne)
+ writer.close()
+
+ val stream = document.asInputStream()
+ try {
+ val bytes = stream.readAllBytes()
+ assert(bytes.nonEmpty, "asInputStream should yield non-empty bytes after
writes")
+ // ZIP local-file-header magic bytes: 0x50 0x4B 0x03 0x04 ("PK\x03\x04").
+ assert(
+ bytes(0) == 0x50.toByte && bytes(1) == 0x4b.toByte,
+ "expected ZIP magic bytes at the start of the stream"
+ )
+ } finally {
+ stream.close()
+ }
+ }
+
/** Returns a dynamic proxy for `realTable` that increments `counter` on
every `refresh()` call. */
private def tableWithRefreshSpy(realTable: Table, counter: AtomicInteger):
Table =
Proxy