This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c8cde9b7d47 [HUDI-8849] Reenable TestExpressionIndex (#12607)
c8cde9b7d47 is described below
commit c8cde9b7d4794fd37e2fe3b0b6a76d53c4db9041
Author: Lokesh Jain <[email protected]>
AuthorDate: Fri Jan 10 21:15:39 2025 +0530
[HUDI-8849] Reenable TestExpressionIndex (#12607)
* Reenable TestExpressionIndex
* Fix test failures
---
.../main/scala/org/apache/hudi/ExpressionIndexSupport.scala | 2 +-
.../main/scala/org/apache/hudi/SparkBaseIndexSupport.scala | 11 ++++++++---
.../spark/sql/hudi/command/index/TestExpressionIndex.scala | 6 ++++--
3 files changed, 13 insertions(+), 6 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index 8dd9b62a62c..983b6482d6f 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -82,7 +82,7 @@ class ExpressionIndexSupport(spark: SparkSession,
val (prunedPartitions, prunedFileNames) =
getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
val expressionIndexRecords =
loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory)
loadTransposed(queryReferencedColumns, readInMemory,
expressionIndexRecords, expressionIndexQuery) {
- transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF,
Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true))
+ transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF,
Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true,
Option.apply(indexDefinition)))
}
} else if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
{
val prunedPartitionAndFileNames =
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices,
includeLogFiles = true)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index 06c012ac608..9cd53a75ff5 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -21,7 +21,7 @@ package org.apache.hudi
import org.apache.hudi.HoodieFileIndex.DataSkippingFailureMode
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
-import org.apache.hudi.common.model.FileSlice
+import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.keygen.KeyGenUtils
import org.apache.hudi.keygen.KeyGenUtils.DEFAULT_RECORD_KEY_PARTS_SEPARATOR
@@ -102,8 +102,13 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
(prunedPartitions, prunedFiles)
}
- protected def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression], fileNamesFromPrunedPartitions: Set[String], isExpressionIndex:
Boolean = false): Set[String] = {
- val indexedCols : Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+ protected def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression], fileNamesFromPrunedPartitions: Set[String],
+ isExpressionIndex: Boolean = false,
indexDefinitionOpt: Option[HoodieIndexDefinition] = Option.empty): Set[String]
= {
+ val indexedCols : Seq[String] = if (indexDefinitionOpt.isDefined) {
+ indexDefinitionOpt.get.getSourceFields.asScala.toSeq
+ } else {
+
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+ }
val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, isExpressionIndex,
indexedCols)).reduce(And)
if (indexFilter.equals(TrueLiteral)) {
// if there are any non indexed cols or we can't translate source expr,
we have to read all files and may not benefit from col stats lookup.
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
index 4a57e1ed59b..9d8c5613f1b 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
@@ -57,12 +57,10 @@ import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, SaveMode, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
-import org.scalatest.Ignore
import java.util.stream.Collectors
import scala.collection.JavaConverters
-@Ignore
class TestExpressionIndex extends HoodieSparkSqlTestBase {
override protected def beforeAll(): Unit = {
@@ -777,6 +775,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)
+ setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
@@ -856,6 +855,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)
+ setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
@@ -1471,6 +1471,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)
+ setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
if (HoodieSparkUtils.gteqSpark3_4) {
spark.sql("set spark.sql.defaultColumn.enabled=false")
@@ -1618,6 +1619,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
|location '$basePath'
|""".stripMargin)
+ setCompactionConfigs(tableType)
spark.sql("set hoodie.parquet.small.file.limit=0")
spark.sql("set hoodie.fileIndex.dataSkippingFailureMode=strict")
if (HoodieSparkUtils.gteqSpark3_4) {