This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c8cde9b7d47 [HUDI-8849] Reenable TestExpressionIndex (#12607)
c8cde9b7d47 is described below

commit c8cde9b7d4794fd37e2fe3b0b6a76d53c4db9041
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jan 10 21:15:39 2025 +0530

    [HUDI-8849] Reenable TestExpressionIndex (#12607)
    
    * Reenable TestExpressionIndex
    
    * Fix test failures
---
 .../main/scala/org/apache/hudi/ExpressionIndexSupport.scala   |  2 +-
 .../main/scala/org/apache/hudi/SparkBaseIndexSupport.scala    | 11 ++++++++---
 .../spark/sql/hudi/command/index/TestExpressionIndex.scala    |  6 ++++--
 3 files changed, 13 insertions(+), 6 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index 8dd9b62a62c..983b6482d6f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -82,7 +82,7 @@ class ExpressionIndexSupport(spark: SparkSession,
         val (prunedPartitions, prunedFileNames) = 
getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
         val expressionIndexRecords = 
loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory)
         loadTransposed(queryReferencedColumns, readInMemory, 
expressionIndexRecords, expressionIndexQuery) {
-          transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, 
Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true))
+          transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, 
Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true, 
Option.apply(indexDefinition)))
         }
       } else if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
 {
         val prunedPartitionAndFileNames = 
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, 
includeLogFiles = true)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index 06c012ac608..9cd53a75ff5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -21,7 +21,7 @@ package org.apache.hudi
 import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.keygen.KeyGenUtils
 import org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR
@@ -102,8 +102,13 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
     (prunedPartitions, prunedFiles)
   }
 
-  protected def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex: 
Boolean = false): Set[String] = {
-    val indexedCols : Seq[String] = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+  protected def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression], fileNamesFromPrunedPartitions: Set[String],
+                                  isExpressionIndex: Boolean = false, 
indexDefinitionOpt: Option[HoodieIndexDefinition] = Option.empty): Set[String] 
= {
+    val indexedCols : Seq[String] = if (indexDefinitionOpt.isDefined) {
+      indexDefinitionOpt.get.getSourceFields.asScala.toSeq
+    } else {
+      
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+    }
     val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex, 
indexedCols)).reduce(And)
     if (indexFilter.equals(TrueLiteral)) {
       // if there are any non indexed cols or we can't translate source expr, 
we have to read all files and may not benefit from col stats lookup.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
index 4a57e1ed59b..9d8c5613f1b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
@@ -57,12 +57,10 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.{Column, SaveMode, functions}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
-import org.scalatest.Ignore
 
 import java.util.stream.Collectors
 import scala.collection.JavaConverters
 
-@Ignore
 class TestExpressionIndex extends HoodieSparkSqlTestBase {
 
   override protected def beforeAll(): Unit = {
@@ -777,6 +775,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
              |location '$basePath'
              |""".stripMargin)
 
+        setCompactionConfigs(tableType)
         spark.sql("set hoodie.parquet.small.file.limit=0")
         if (HoodieSparkUtils.gteqSpark3_4) {
           spark.sql("set spark.sql.defaultColumn.enabled=false")
@@ -856,6 +855,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
              |location '$basePath'
              |""".stripMargin)
 
+        setCompactionConfigs(tableType)
         spark.sql("set hoodie.parquet.small.file.limit=0")
         if (HoodieSparkUtils.gteqSpark3_4) {
           spark.sql("set spark.sql.defaultColumn.enabled=false")
@@ -1471,6 +1471,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
              |location '$basePath'
              |""".stripMargin)
 
+        setCompactionConfigs(tableType)
         spark.sql("set hoodie.parquet.small.file.limit=0")
         if (HoodieSparkUtils.gteqSpark3_4) {
           spark.sql("set spark.sql.defaultColumn.enabled=false")
@@ -1618,6 +1619,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
              |location '$basePath'
              |""".stripMargin)
 
+        setCompactionConfigs(tableType)
         spark.sql("set hoodie.parquet.small.file.limit=0")
         spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
         if (HoodieSparkUtils.gteqSpark3_4) {

Reply via email to