This is an automated email from the ASF dual-hosted git repository. xushiyan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 98616b1196b [HUDI-7244] Ensure HoodieFileGroupReader.close() is called in spark (#10381) 98616b1196b is described below commit 98616b1196bdcdd8567b4b13dc38d0e305340aba Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Tue Jan 2 21:30:25 2024 -0500 [HUDI-7244] Ensure HoodieFileGroupReader.close() is called in spark (#10381) --------- Co-authored-by: Jonathan Vexler <=> --- .../hudi/util/CloseableInternalRowIterator.scala | 7 +++++- .../common/table/read/HoodieFileGroupReader.java | 4 +++- ...odieFileGroupReaderBasedParquetFileFormat.scala | 26 ++++++++++++++++------ 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala index 30a5e93fb63..bf71a9c6a41 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/util/CloseableInternalRowIterator.scala @@ -23,6 +23,8 @@ import org.apache.hudi.common.util.collection.ClosableIterator import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.vectorized.ColumnarBatch +import java.io.Closeable + /** * A [[ClosableIterator]] returning [[InternalRow]] by iterating through the entries returned * by a Spark reader. @@ -37,7 +39,10 @@ class CloseableInternalRowIterator(iterator: Iterator[_]) extends ClosableIterat private var seqInBatch: Int = -1 override def close(): Unit = { - // No op + iterator match { + case iterator: Iterator[_] with Closeable => iterator.close() + case _ => + } } override def hasNext: Boolean = { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java index 52ee14d969e..8edf5d7130e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java @@ -306,7 +306,7 @@ public final class HoodieFileGroupReader<T> implements Closeable { } public static class HoodieFileGroupReaderIterator<T> implements ClosableIterator<T> { - private final HoodieFileGroupReader<T> reader; + private HoodieFileGroupReader<T> reader; public HoodieFileGroupReaderIterator(HoodieFileGroupReader<T> reader) { this.reader = reader; @@ -332,6 +332,8 @@ public final class HoodieFileGroupReader<T> implements Closeable { reader.close(); } catch (IOException e) { throw new HoodieIOException("Failed to close the reader", e); + } finally { + this.reader = null; } } } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala index 82a38a58841..f2b66e25603 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieFileGroupReaderBasedParquetFileFormat.scala @@ -28,18 +28,19 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, HoodieRecord} import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} import org.apache.hudi.common.table.read.HoodieFileGroupReader -import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieSparkUtils, HoodieTableSchema, HoodieTableState, MergeOnReadSnapshotRelation, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} +import org.apache.hudi.{AvroConversionUtils, HoodieFileIndex, HoodiePartitionCDCFileGroupMapping, HoodiePartitionFileSliceMapping, HoodieSparkUtils, HoodieTableSchema, HoodieTableState, SparkAdapterSupport, SparkFileFormatInternalRowReaderContext} import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.JoinedRow import org.apache.spark.sql.execution.datasources.PartitionedFile -import org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{ROW_INDEX, ROW_INDEX_TEMPORARY_COLUMN_NAME, getAppliedFilters, getAppliedRequiredSchema, getLogFilesFromSlice, getRecordKeyRelatedFilters} +import org.apache.spark.sql.execution.datasources.parquet.HoodieFileGroupReaderBasedParquetFileFormat.{ROW_INDEX, ROW_INDEX_TEMPORARY_COLUMN_NAME, getAppliedFilters, getAppliedRequiredSchema, getLogFilesFromSlice, getRecordKeyRelatedFilters, makeCloseableFileGroupMappingRecordIterator} import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isMetaField import org.apache.spark.sql.sources.Filter import org.apache.spark.sql.types.{LongType, Metadata, MetadataBuilder, StringType, StructField, StructType} import org.apache.spark.util.SerializableConfiguration +import java.io.Closeable import scala.annotation.tailrec import scala.collection.mutable import scala.jdk.CollectionConverters.asScalaIteratorConverter @@ -156,7 +157,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, reader.initRecordIterators() // Append partition values to rows and project to output schema appendPartitionAndProject( - reader.getClosableIterator.asInstanceOf[java.util.Iterator[InternalRow]].asScala, + reader.getClosableIterator, requiredSchema, partitionSchema, outputSchema, @@ -197,7 +198,7 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, props) } - private def appendPartitionAndProject(iter: Iterator[InternalRow], + private def appendPartitionAndProject(iter: HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow], inputSchema: StructType, partitionSchema: StructType, to: StructType, @@ -207,15 +208,15 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState, } else { val unsafeProjection = generateUnsafeProjection(StructType(inputSchema.fields ++ partitionSchema.fields), to) val joinedRow = new JoinedRow() - iter.map(d => unsafeProjection(joinedRow(d, partitionValues))) + makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(joinedRow(d, partitionValues))) } } - private def projectSchema(iter: Iterator[InternalRow], + private def projectSchema(iter: HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow], from: StructType, to: StructType): Iterator[InternalRow] = { val unsafeProjection = generateUnsafeProjection(from, to) - iter.map(d => unsafeProjection(d)) + makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(d)) } private def generateRequiredSchemaWithMandatory(requiredSchema: StructType, @@ -414,4 +415,15 @@ object HoodieFileGroupReaderBasedParquetFileFormat { def shouldAddRecordKeyFilters(shouldUseRecordPosition: Boolean): Boolean = { (!shouldUseRecordPosition) || HoodieSparkUtils.gteqSpark3_5 } + + def makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow], + mappingFunction: Function[InternalRow, InternalRow]): Iterator[InternalRow] = { + new Iterator[InternalRow] with Closeable { + override def hasNext: Boolean = closeableFileGroupRecordIterator.hasNext + + override def next(): InternalRow = mappingFunction(closeableFileGroupRecordIterator.next()) + + override def close(): Unit = closeableFileGroupRecordIterator.close() + } + } }