This is an automated email from the ASF dual-hosted git repository.

stream2000 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new eae5d4ae8e6 [HUDI-7291] Pushing Down Partition Pruning Conditions to 
Column Stats Earlier During Data Skipping (#10493)
eae5d4ae8e6 is described below

commit eae5d4ae8e62014191fac76bbbeae0939f11100b
Author: majian <47964462+majian1...@users.noreply.github.com>
AuthorDate: Wed Jan 17 14:17:29 2024 +0800

    [HUDI-7291] Pushing Down Partition Pruning Conditions to Column Stats 
Earlier During Data Skipping (#10493)
    
    * push down partition pruning filters when loading col stats index
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  | 14 ++++++--
 .../scala/org/apache/hudi/HoodieFileIndex.scala    | 37 ++++++++++++++--------
 2 files changed, 36 insertions(+), 15 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index 9cdb15092b0..7a75c6c35ca 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -26,6 +26,7 @@ import org.apache.hudi.avro.model._
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
+import org.apache.hudi.common.function.SerializableFunction
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.BinaryUtil.toBytes
@@ -106,14 +107,23 @@ class ColumnStatsIndexSupport(spark: SparkSession,
    *
    * Please check out scala-doc of the [[transpose]] method explaining this 
view in more details
    */
-  def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: 
Boolean)(block: DataFrame => T): T = {
+  def loadTransposed[T](targetColumns: Seq[String], shouldReadInMemory: 
Boolean, prunedFileNames: Set[String] = Set.empty)(block: DataFrame => T): T = {
     cachedColumnStatsIndexViews.get(targetColumns) match {
       case Some(cachedDF) =>
         block(cachedDF)
 
       case None =>
-        val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+        val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = if 
(prunedFileNames.isEmpty) {
+          // NOTE: Because some tests directly check this method and don't get 
prunedPartitionsAndFileSlices, we need to make sure these tests are correct.
           loadColumnStatsIndexRecords(targetColumns, shouldReadInMemory)
+        } else {
+          val filterFunction = new 
SerializableFunction[HoodieMetadataColumnStats, java.lang.Boolean] {
+            override def apply(r: HoodieMetadataColumnStats): 
java.lang.Boolean = {
+              prunedFileNames.contains(r.getFileName)
+            }
+          }
+          loadColumnStatsIndexRecords(targetColumns, 
shouldReadInMemory).filter(filterFunction)
+        }
 
         withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
           val (transposedRows, indexSchema) = transpose(colStatsRecords, 
targetColumns)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 709dfec183b..db8525be3d1 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -234,7 +234,7 @@ case class HoodieFileIndex(spark: SparkSession,
       //    - Record-level Index is present
       //    - List of predicates (filters) is present
       val candidateFilesNamesOpt: Option[Set[String]] =
-      lookupCandidateFilesInMetadataTable(dataFilters) match {
+      lookupCandidateFilesInMetadataTable(dataFilters, 
prunedPartitionsAndFileSlices) match {
         case Success(opt) => opt
         case Failure(e) =>
           logError("Failed to lookup candidate files in File Index", e)
@@ -316,11 +316,6 @@ case class HoodieFileIndex(spark: SparkSession,
     })
   }
 
-  private def lookupFileNamesMissingFromIndex(allIndexedFileNames: 
Set[String]) = {
-    val allFileNames = getAllFiles().map(f => f.getPath.getName).toSet
-    allFileNames -- allIndexedFileNames
-  }
-
   /**
    * Computes pruned list of candidate base-files' names based on provided 
list of {@link dataFilters}
    * conditions, by leveraging Metadata Table's Record Level Index and Column 
Statistics index (hereon referred as
@@ -333,7 +328,7 @@ case class HoodieFileIndex(spark: SparkSession,
    * @param queryFilters list of original data filters passed down from 
querying engine
    * @return list of pruned (data-skipped) candidate base-files and log files' 
names
    */
-  private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression]): Try[Option[Set[String]]] = Try {
+  private def lookupCandidateFilesInMetadataTable(queryFilters: 
Seq[Expression], prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]): 
Try[Option[Set[String]]] = Try {
     // NOTE: For column stats, Data Skipping is only effective when it 
references columns that are indexed w/in
     //       the Column Stats Index (CSI). Following cases could not be 
effectively handled by Data Skipping:
     //          - Expressions on top-level column's fields (ie, for ex filters 
like "struct.field > 0", since
@@ -345,7 +340,6 @@ case class HoodieFileIndex(spark: SparkSession,
     //       and candidate files are obtained from these file slices.
 
     lazy val queryReferencedColumns = collectReferencedColumns(spark, 
queryFilters, schema)
-
     lazy val (_, recordKeys) = 
recordLevelIndex.filterQueriesWithRecordKey(queryFilters)
     if (!isMetadataTableEnabled || !isDataSkippingEnabled) {
       validateConfig()
@@ -353,9 +347,10 @@ case class HoodieFileIndex(spark: SparkSession,
     } else if (recordKeys.nonEmpty) {
       Option.apply(recordLevelIndex.getCandidateFiles(getAllFiles(), 
recordKeys))
     } else if (functionalIndex.isIndexAvailable && !queryFilters.isEmpty) {
+      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
       val shouldReadInMemory = functionalIndex.shouldReadInMemory(this, 
queryReferencedColumns)
       val indexDf = functionalIndex.loadFunctionalIndexDataFrame("", 
shouldReadInMemory)
-      Some(getCandidateFiles(indexDf, queryFilters))
+      Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
     } else if (!columnStatsIndex.isIndexAvailable || queryFilters.isEmpty || 
queryReferencedColumns.isEmpty) {
       validateConfig()
       Option.empty
@@ -366,14 +361,30 @@ case class HoodieFileIndex(spark: SparkSession,
       //       For that we use a simple-heuristic to determine whether we 
should read and process CSI in-memory or
       //       on-cluster: total number of rows of the expected projected 
portion of the index has to be below the
       //       threshold (of 100k records)
+      val prunedFileNames = getPrunedFileNames(prunedPartitionsAndFileSlices)
       val shouldReadInMemory = columnStatsIndex.shouldReadInMemory(this, 
queryReferencedColumns)
-      columnStatsIndex.loadTransposed(queryReferencedColumns, 
shouldReadInMemory) { transposedColStatsDF =>
-        Some(getCandidateFiles(transposedColStatsDF, queryFilters))
+      columnStatsIndex.loadTransposed(queryReferencedColumns, 
shouldReadInMemory, prunedFileNames) { transposedColStatsDF =>
+        Some(getCandidateFiles(transposedColStatsDF, queryFilters, 
prunedFileNames))
       }
     }
   }
 
-  private def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression]): Set[String] = {
+  private def getPrunedFileNames(prunedPartitionsAndFileSlices: 
Seq[(Option[BaseHoodieTableFileIndex.PartitionPath], Seq[FileSlice])]): 
Set[String] = {
+      prunedPartitionsAndFileSlices
+        .flatMap {
+          case (_, fileSlices) => fileSlices
+        }
+        .flatMap { fileSlice =>
+          val baseFileOption = Option(fileSlice.getBaseFile.orElse(null))
+          val logFiles = if (includeLogFiles) {
+            fileSlice.getLogFiles.iterator().asScala.map(_.getFileName).toList
+          } else Nil
+          baseFileOption.map(_.getFileName).toList ++ logFiles
+        }
+        .toSet
+  }
+
+  private def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
     val indexSchema = indexDf.schema
     val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
     val prunedCandidateFileNames =
@@ -395,7 +406,7 @@ case class HoodieFileIndex(spark: SparkSession,
       .collect()
       .map(_.getString(0))
       .toSet
-    val notIndexedFileNames = 
lookupFileNamesMissingFromIndex(allIndexedFileNames)
+    val notIndexedFileNames = prunedFileNames -- allIndexedFileNames
 
     prunedCandidateFileNames ++ notIndexedFileNames
   }

Reply via email to