This is an automated email from the ASF dual-hosted git repository. stream2000 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new eae5d4ae8e6 [HUDI-7291] Pushing Down Partition Pruning Conditions to Column Stats Earlier During Data Skipping (#10493) eae5d4ae8e6 is described below commit eae5d4ae8e62014191fac76bbbeae0939f11100b Author: majian <47964462+majian1...@users.noreply.github.com> AuthorDate: Wed Jan 17 14:17:29 2024 +0800 [HUDI-7291] Pushing Down Partition Pruning Conditions to Column Stats Earlier During Data Skipping (#10493) * push down partition pruning filters when loading col stats index --- .../org/apache/hudi/ColumnStatsIndexSupport.scala | 14 ++++++-- .../scala/org/apache/hudi/HoodieFileIndex.scala | 37 ++++++++++++++-------- 2 files changed, 36 insertions(+), 15 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala index 9cdb15092b0..7a75c6c35ca 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala @@ -26,6 +26,7 @@ import org.apache.hudi.avro.model._ import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.data.HoodieData +import org.apache.hudi.common.function.SerializableFunction import org.apache.hudi.common.model.HoodieRecord import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.util.BinaryUtil.toBytes @@ -106,14 +107,23 @@ class ColumnStatsIndexSupport(spark: SparkSession, * * Please check out scala-doc of the [[transpose]] method explaining this view in more details */ - def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: Boolean)(block: DataFrame => T): T = { + def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: Boolean, prunedFileNames: Set[String] = Set.empty)(block: DataFrame => T): T = { cachedColumnStatsIndexViews.get(targetColumns) match { case Some(cachedDF) => block(cachedDF) case None => - val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = + val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = if (prunedFileNames.isEmpty) { + // NOTE: Because some tests directly check this method and don't get prunedPartitionsAndFileSlices, we need to make sure these tests are correct. loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory) + } else { + val filterFunction = new SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] { + override def apply(r: HoodieMetadataColumnStats): java.lang.Boolean = { + prunedFileNames.contains(r.getFileName) + } + } + loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory).filter(filterFunction) + } withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) { val (transposedRows, indexSchema) = transpose(colStatsRecords, targetColumns) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala index 709dfec183b..db8525be3d1 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala @@ -234,7 +234,7 @@ case class HoodieFileIndex(spark: SparkSession, // - Record-level Index is present // - List of predicates (filters) is present val candidateFilesNamesOpt: Option[Set[String]] = - lookupCandidateFilesInMetadataTable(dataFilters) match { + lookupCandidateFilesInMetadataTable(dataFilters, prunedPartitionsAndFileSlices) match { case Success(opt) => opt case Failure(e) => logError("Failed to lookup candidate files in File Index", e) @@ -316,11 +316,6 @@ case class HoodieFileIndex(spark: SparkSession, }) } - private def lookupFileNamesMissingFromIndex(allIndexedFileNames: Set[String]) = { - val allFileNames = getAllFiles().map(f => f.getPath.getName).toSet - allFileNames -- allIndexedFileNames - } - /** * Computes pruned list of candidate base-files' names based on provided list of {@link dataFilters} * conditions, by leveraging Metadata Table's Record Level Index and Column Statistics index (hereon referred as @@ -333,7 +328,7 @@ case class HoodieFileIndex(spark: SparkSession, * @param queryFilters list of original data filters passed down from querying engine * @return list of pruned (data-skipped) candidate base-files and log files' names */ - private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression]): Try[Option[Set[String]]] = Try { + private def lookupCandidateFilesInMetadataTable(queryFilters: Seq[Expression], prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]): Try[Option[Set[String]]] = Try { // NOTE: For column stats, Data Skipping is only effective when it references columns that are indexed w/in // the Column Stats Index (CSI). Following cases could not be effectively handled by Data Skipping: // - Expressions on top-level column's fields (ie, for ex filters like "struct.field > 0", since @@ -345,7 +340,6 @@ case class HoodieFileIndex(spark: SparkSession, // and candidate files are obtained from these file slices. lazy val queryReferencedColumns = collectReferencedColumns(spark, queryFilters, schema) - lazy val (_, recordKeys) = recordLevelIndex.filterQueriesWithRecordKey(queryFilters) if (!isMetadataTableEnabled || !isDataSkippingEnabled) { validateConfig() @@ -353,9 +347,10 @@ case class HoodieFileIndex(spark: SparkSession, } else if (recordKeys.nonEmpty) { Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), recordKeys)) } else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) { + val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, queryReferencedColumns) val indexDf = functionalIndex.loadFunctionalIndexDataFrame("", shouldReadInMemory) - Some(getCandidateFiles(indexDf, queryFilters)) + Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames)) } else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty || queryReferencedColumns.isEmpty) { validateConfig() Option.empty @@ -366,14 +361,30 @@ case class HoodieFileIndex(spark: SparkSession, // For that we use a simple-heuristic to determine whether we should read and process CSI in-memory or // on-cluster: total number of rows of the expected projected portion of the index has to be below the // threshold (of 100k records) + val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices) val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, queryReferencedColumns) - columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory) { transposedColStatsDF => - Some(getCandidateFiles(transposedColStatsDF, queryFilters)) + columnStatsIndex.loadTransposed(queryReferencedColumns, shouldReadInMemory, prunedFileNames) { transposedColStatsDF => + Some(getCandidateFiles(transposedColStatsDF, queryFilters, prunedFileNames)) } } } - private def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression]): Set[String] = { + private def getPrunedFileNames(prunedPartitionsAndFileSlices: Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]): Set[String] = { + prunedPartitionsAndFileSlices + .flatMap { + case (_, fileSlices) => fileSlices + } + .flatMap { fileSlice => + val baseFileOption = Option(fileSlice.getBaseFile.orElse(null)) + val logFiles = if (includeLogFiles) { + fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList + } else Nil + baseFileOption.map(_.getFileName).toList ++ logFiles + } + .toSet + } + + private def getCandidateFiles(indexDf: DataFrame, queryFilters: Seq[Expression], prunedFileNames: Set[String]): Set[String] = { val indexSchema = indexDf.schema val indexFilter = queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema)).reduce(And) val prunedCandidateFileNames = @@ -395,7 +406,7 @@ case class HoodieFileIndex(spark: SparkSession, .collect() .map(_.getString(0)) .toSet - val notIndexedFileNames = lookupFileNamesMissingFromIndex(allIndexedFileNames) + val notIndexedFileNames = prunedFileNames -- allIndexedFileNames prunedCandidateFileNames ++ notIndexedFileNames }