viirya commented on code in PR #51528:
URL: https://github.com/apache/spark/pull/51528#discussion_r2214695295
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetPartitionReaderFactory.scala:
##########
@@ -337,8 +340,39 @@ case class ParquetPartitionReaderFactory(
capacity)
val iter = new RecordReaderIterator(vectorizedReader)
// SPARK-23457 Register a task completion listener before `initialization`.
- taskContext.foreach(_.addTaskCompletionListener[Unit](_ => iter.close()))
+ parquetReaderCallback.advanceFile(iter)
+ taskContext.foreach(parquetReaderCallback.initIfNotAlready)
logDebug(s"Appending $partitionSchema $partitionValues")
vectorizedReader
}
}
+
+/**
+ * A callback class to handle the cleanup of Parquet readers.
+ *
+ * This class is used to ensure that the Parquet readers are closed properly
when the task
+ * completes, and it also allows for the initialization of the reader callback
only once per task.
+ */
+private class ParquetReaderCallback extends Serializable {
+ private var init: Boolean = false
+ private var iter: RecordReaderIterator[_] = null
+
+ def initIfNotAlready(taskContext: TaskContext): Unit = {
+ if (!init) {
+ taskContext.addTaskCompletionListener[Unit](_ => execute())
+ init = true
+ }
+ }
+
+ def advanceFile(iter: RecordReaderIterator[_]): Unit = {
+ execute()
+
+ this.iter = iter
+ }
+
+ def execute(): Unit = {
Review Comment:
Okay
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]