Yicong-Huang commented on code in PR #5149:
URL: https://github.com/apache/texera/pull/5149#discussion_r3292325699


##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/result/iceberg/IcebergDocument.scala:
##########
@@ -202,38 +207,43 @@ private[storage] class IcebergDocument[T >: Null <: 
AnyRef](
             }
             table.foreach(_.refresh())
 
-            // Retrieve and sort the file scan tasks by file sequence number
+            // Retrieve and sort the file scan tasks by file sequence number.
+            // Materialize inside `Using.resource` so the `planFiles()`
+            // CloseableIterable is released after collection.
             val fileScanTasksIterator: Iterator[FileScanTask] = table match {
               case Some(t) =>
                 val currentSnapshotId = 
Option(t.currentSnapshot()).map(_.snapshotId())
-                val fileScanTasks = (lastSnapshotId, currentSnapshotId) match {
-                  // Read from the start
-                  case (None, Some(_)) =>
-                    val tasks = t.newScan().planFiles().iterator().asScala
-                    lastSnapshotId = currentSnapshotId
-                    tasks
-
-                  // Read incrementally from the last snapshot
-                  case (Some(lastId), Some(currId)) if lastId != currId =>
-                    val tasks = t
-                      .newIncrementalAppendScan()
-                      .fromSnapshotExclusive(lastId)
-                      .toSnapshot(currId)
-                      .planFiles()
-                      .iterator()
-                      .asScala
-                    lastSnapshotId = currentSnapshotId
-                    tasks
-
-                  // No new data
-                  case (Some(lastId), Some(currId)) if lastId == currId =>
-                    Iterator.empty
-
-                  // Default: No data yet
-                  case _ =>
-                    Iterator.empty
-                }
-                
fileScanTasks.toSeq.sortBy(_.file().fileSequenceNumber()).iterator
+                val fileScanTasks: Seq[FileScanTask] =
+                  (lastSnapshotId, currentSnapshotId) match {
+                    // Read from the start
+                    case (None, Some(_)) =>
+                      val tasks = Using.resource(t.newScan().planFiles()) { ci 
=>
+                        ci.iterator().asScala.toSeq
+                      }
+                      lastSnapshotId = currentSnapshotId
+                      tasks
+
+                    // Read incrementally from the last snapshot
+                    case (Some(lastId), Some(currId)) if lastId != currId =>
+                      val tasks = Using.resource(

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to