This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 98616b1196b [HUDI-7244] Ensure HoodieFileGroupReader.close() is called 
in spark (#10381)
98616b1196b is described below

commit 98616b1196bdcdd8567b4b13dc38d0e305340aba
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Tue Jan 2 21:30:25 2024 -0500

    [HUDI-7244] Ensure HoodieFileGroupReader.close() is called in spark (#10381)
    
    
    ---------
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/util/CloseableInternalRowIterator.scala   |  7 +++++-
 .../common/table/read/HoodieFileGroupReader.java   |  4 +++-
 ...odieFileGroupReaderBasedParquetFileFormat.scala | 26 ++++++++++++++++------
 3 files changed, 28 insertions(+), 9 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
index 30a5e93fb63..bf71a9c6a41 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala
@@ -23,6 +23,8 @@ import org.apache.hudi.common.util.collection.ClosableIterator
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.vectorized.ColumnarBatch
 
+import java.io.Closeable
+
 /**
  * A [[ClosableIterator]] returning [[InternalRow]] by iterating through the 
entries returned
  * by a Spark reader.
@@ -37,7 +39,10 @@ class CloseableInternalRowIterator(iterator: Iterator[_]) 
extends ClosableIterat
   private var seqInBatch: Int = -1
 
   override def close(): Unit = {
-    // No op
+    iterator match {
+      case iterator: Iterator[_] with Closeable => iterator.close()
+      case _ =>
+    }
   }
 
   override def hasNext: Boolean = {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 52ee14d969e..8edf5d7130e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -306,7 +306,7 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
   }
 
   public static class HoodieFileGroupReaderIterator<T> implements 
ClosableIterator<T> {
-    private final HoodieFileGroupReader<T> reader;
+    private HoodieFileGroupReader<T> reader;
 
     public HoodieFileGroupReaderIterator(HoodieFileGroupReader<T> reader) {
       this.reader = reader;
@@ -332,6 +332,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
         reader.close();
       } catch (IOException e) {
         throw new HoodieIOException("Failed to close the reader", e);
+      } finally {
+        this.reader = null;
       }
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
index 82a38a58841..f2b66e25603 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala
@@ -28,18 +28,19 @@ import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord}
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient}
 import org.apache.hudi.common.table.read.HoodieFileGroupReader
-import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, 
MergeOnReadSnapshotRelation, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
+import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, 
HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, 
HoodieSparkUtils, HoodieTableSchema, HoodieTableState, SparkAdapterSupport, 
SparkFileFormatInternalRowReaderContext}
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.JoinedRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
-import 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{ROW_INDEX,
 ROW_INDEX_TEMPORARY_COLUMN_NAME, getAppliedFilters, getAppliedRequiredSchema, 
getLogFilesFromSlice, getRecordKeyRelatedFilters}
+import 
org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{ROW_INDEX,
 ROW_INDEX_TEMPORARY_COLUMN_NAME, getAppliedFilters, getAppliedRequiredSchema, 
getLogFilesFromSlice, getRecordKeyRelatedFilters, 
makeCloseableFileGroupMappingRecordIterator}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, 
StringType, StructField, StructType}
 import org.apache.spark.util.SerializableConfiguration
 
+import java.io.Closeable
 import scala.annotation.tailrec
 import scala.collection.mutable
 import scala.jdk.CollectionConverters.asScalaIteratorConverter
@@ -156,7 +157,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
                 reader.initRecordIterators()
                 // Append partition values to rows and project to output schema
                 appendPartitionAndProject(
-                  
reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala,
+                  reader.getClosableIterator,
                   requiredSchema,
                   partitionSchema,
                   outputSchema,
@@ -197,7 +198,7 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
       props)
   }
 
-  private def appendPartitionAndProject(iter: Iterator[InternalRow],
+  private def appendPartitionAndProject(iter: 
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
                                         inputSchema: StructType,
                                         partitionSchema: StructType,
                                         to: StructType,
@@ -207,15 +208,15 @@ class 
HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
     } else {
       val unsafeProjection = 
generateUnsafeProjection(StructType(inputSchema.fields ++ 
partitionSchema.fields), to)
       val joinedRow = new JoinedRow()
-      iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+      makeCloseableFileGroupMappingRecordIterator(iter, d => 
unsafeProjection(joinedRow(d, partitionValues)))
     }
   }
 
-  private def projectSchema(iter: Iterator[InternalRow],
+  private def projectSchema(iter: 
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
                             from: StructType,
                             to: StructType): Iterator[InternalRow] = {
     val unsafeProjection = generateUnsafeProjection(from, to)
-    iter.map(d => unsafeProjection(d))
+    makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(d))
   }
 
   private def generateRequiredSchemaWithMandatory(requiredSchema: StructType,
@@ -414,4 +415,15 @@ object HoodieFileGroupReaderBasedParquetFileFormat {
   def shouldAddRecordKeyFilters(shouldUseRecordPosition: Boolean): Boolean = {
     (!shouldUseRecordPosition) || HoodieSparkUtils.gteqSpark3_5
   }
+
+  def 
makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: 
HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
+                                                  mappingFunction: 
Function[InternalRow, InternalRow]): Iterator[InternalRow] = {
+    new Iterator[InternalRow] with Closeable {
+      override def hasNext: Boolean = closeableFileGroupRecordIterator.hasNext
+
+      override def next(): InternalRow = 
mappingFunction(closeableFileGroupRecordIterator.next())
+
+      override def close(): Unit = closeableFileGroupRecordIterator.close()
+    }
+  }
 }

Reply via email to