This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a1cf2439e98 [HUDI-8643] Filter records for from_unixtime function with 
expression index (#12416)
a1cf2439e98 is described below

commit a1cf2439e983a35c53c9c815a4c8e64e05af615f
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Dec 4 11:06:32 2024 +0530

    [HUDI-8643] Filter records for from_unixtime function with expression index 
(#12416)
    
    * [HUDI-8643] Filter records for from_unixtime function with expression 
index
    
    * fix test by using diff table name
    
    * minor code cleanup
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../org/apache/hudi/ColumnStatsIndexSupport.scala  |   6 +-
 .../org/apache/hudi/ExpressionIndexSupport.scala   | 311 +++++++++++++++++++--
 .../scala/org/apache/hudi/HoodieFileIndex.scala    |   2 +-
 .../org/apache/hudi/RecordLevelIndexSupport.scala  |  74 +++++
 .../org/apache/hudi/SparkBaseIndexSupport.scala    |   4 +-
 .../apache/spark/sql/hudi/DataSkippingUtils.scala  |  18 +-
 .../hudi/command/index/TestExpressionIndex.scala   | 112 +++++++-
 7 files changed, 487 insertions(+), 40 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index fc1f15ad409..1becf688324 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -435,10 +435,10 @@ object ColumnStatsIndexSupport {
     String.format("%s_%s", col, statName)
   }
 
-  @inline private def composeColumnStatStructType(col: String, statName: 
String, dataType: DataType) =
+  @inline def composeColumnStatStructType(col: String, statName: String, 
dataType: DataType) =
     StructField(formatColName(col, statName), dataType, nullable = true, 
Metadata.empty)
 
-  private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
+  def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
     valueWrapper match {
       case w: BooleanWrapper => w.getValue
       case w: IntWrapper => w.getValue
@@ -461,7 +461,7 @@ object ColumnStatsIndexSupport {
 
   val decConv = new DecimalConversion()
 
-  private def deserialize(value: Any, dataType: DataType): Any = {
+  def deserialize(value: Any, dataType: DataType): Any = {
     dataType match {
       // NOTE: Since we can't rely on Avro's "date", and "timestamp-micros" 
logical-types, we're
       //       manually encoding corresponding values as int and long w/in the 
Column Stats Index and
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index 37e27ccd1a1..154498b57ec 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -19,34 +19,47 @@
 
 package org.apache.hudi
 
+import org.apache.hudi.ColumnStatsIndexSupport.{composeColumnStatStructType, 
deserialize, tryUnpackValueWrapper}
 import org.apache.hudi.ExpressionIndexSupport._
+import org.apache.hudi.HoodieCatalystUtils.{withPersistedData, 
withPersistedDataset}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.HoodieSparkExpressionIndex.SPARK_FUNCTION_MAP
-import org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey
+import org.apache.hudi.RecordLevelIndexSupport.{fetchQueryWithAttribute, 
filterQueryWithRecordKey}
 import org.apache.hudi.avro.model.{HoodieMetadataColumnStats, 
HoodieMetadataRecord}
 import org.apache.hudi.common.config.HoodieMetadataConfig
 import org.apache.hudi.common.data.HoodieData
 import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition, 
HoodieRecord}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.collection
 import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
 import org.apache.hudi.data.HoodieJavaRDD
+import 
org.apache.hudi.index.functional.HoodieExpressionIndex.SPARK_FROM_UNIXTIME
 import org.apache.hudi.metadata.{HoodieMetadataPayload, 
HoodieTableMetadataUtil, MetadataPartitionType}
 import org.apache.hudi.util.JFunction
-import 
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, 
createDataFrameFromRDD}
+import 
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows, 
createDataFrameFromRDD, createDataFrameFromRows}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, 
FromUnixTime, In, Literal, UnaryExpression}
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
 import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
 
 import scala.collection.JavaConverters._
+import scala.collection.immutable.TreeSet
+import scala.collection.mutable.ListBuffer
+import scala.collection.parallel.mutable.ParHashMap
 
 class ExpressionIndexSupport(spark: SparkSession,
+                             tableSchema: StructType,
                              metadataConfig: HoodieMetadataConfig,
                              metaClient: HoodieTableMetaClient)
   extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
 
+  @transient private lazy val cachedColumnStatsIndexViews: 
ParHashMap[Seq[String], DataFrame] = ParHashMap()
+
+
   // NOTE: Since [[metadataConfig]] is transient this has to be eagerly 
persisted, before this will be passed on to the executor
   private val inMemoryProjectionThreshold = 
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
 
@@ -60,13 +73,15 @@ class ExpressionIndexSupport(spark: SparkSession,
                                         ): Option[Set[String]] = {
     lazy val expressionIndexPartitionOpt = 
getExpressionIndexPartitionAndLiterals(queryFilters)
     if (isIndexAvailable && queryFilters.nonEmpty && 
expressionIndexPartitionOpt.nonEmpty) {
-      val (indexPartition, literals) = expressionIndexPartitionOpt.get
+      val (indexPartition, expressionIndexQuery, literals) = 
expressionIndexPartitionOpt.get
       val indexDefinition = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
       if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
 {
         val readInMemory = shouldReadInMemory(fileIndex, 
queryReferencedColumns, inMemoryProjectionThreshold)
         val (prunedPartitions, prunedFileNames) = 
getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
-        val indexDf = loadExpressionIndexDataFrame(indexPartition, 
prunedPartitions, readInMemory)
-        Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+        val expressionIndexRecords = 
loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory)
+        loadTransposed(queryReferencedColumns, readInMemory, 
expressionIndexRecords, expressionIndexQuery) {
+          transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF, 
Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true))
+        }
       } else if 
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
 {
         val prunedPartitionAndFileNames = 
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices, 
includeLogFiles = true)
         Option.apply(getCandidateFilesForKeys(indexPartition, 
prunedPartitionAndFileNames, literals))
@@ -78,8 +93,238 @@ class ExpressionIndexSupport(spark: SparkSession,
     }
   }
 
+  /**
+   * Loads view of the Column Stats Index in a transposed format where single 
row coalesces every columns'
+   * statistics for a single file, returning it as [[DataFrame]]
+   *
+   * Please check out scala-doc of the [[transpose]] method explaining this 
view in more details
+   */
+  def loadTransposed[T](targetColumns: Seq[String],
+                        shouldReadInMemory: Boolean,
+                        colStatRecords: HoodieData[HoodieMetadataColumnStats],
+                        expressionIndexQuery: Expression) (block: DataFrame => 
T): T = {
+    cachedColumnStatsIndexViews.get(targetColumns) match {
+      case Some(cachedDF) =>
+        block(cachedDF)
+      case None =>
+        val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
colStatRecords
+        withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
+          val (transposedRows, indexSchema) = transpose(colStatsRecords, 
targetColumns, expressionIndexQuery)
+          val df = if (shouldReadInMemory) {
+            // NOTE: This will instantiate a [[Dataset]] backed by 
[[LocalRelation]] holding all of the rows
+            //       of the transposed table in memory, facilitating execution 
of the subsequently chained operations
+            //       on it locally (on the driver; all such operations are 
actually going to be performed by Spark's
+            //       Optimizer)
+            createDataFrameFromRows(spark, 
transposedRows.collectAsList().asScala.toSeq, indexSchema)
+          } else {
+            val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
+            spark.createDataFrame(rdd, indexSchema)
+          }
+
+          val allowCaching: Boolean = false
+          if (allowCaching) {
+            cachedColumnStatsIndexViews.put(targetColumns, df)
+            // NOTE: Instead of collecting the rows from the index and hold 
them in memory, we instead rely
+            //       on Spark as (potentially distributed) cache managing data 
lifecycle, while we simply keep
+            //       the referenced to persisted [[DataFrame]] instance
+            df.persist(StorageLevel.MEMORY_ONLY)
+
+            block(df)
+          } else {
+            withPersistedDataset(df) {
+              block(df)
+            }
+          }
+        }
+    }
+  }
+
+  /**
+   * Transposes and converts the raw table format of the Column Stats Index 
representation,
+   * where each row/record corresponds to individual (column, file) pair, into 
the table format
+   * where each row corresponds to single file with statistic for individual 
columns collated
+   * w/in such row:
+   *
+   * Metadata Table Column Stats Index format:
+   *
+   * <pre>
+   *  
+---------------------------+------------+------------+------------+-------------+
+   *  |        fileName           | columnName |  minValue  |  maxValue  |  
num_nulls  |
+   *  
+---------------------------+------------+------------+------------+-------------+
+   *  | one_base_file.parquet     |          A |          1 |         10 |     
      0 |
+   *  | another_base_file.parquet |          A |        -10 |          0 |     
      5 |
+   *  
+---------------------------+------------+------------+------------+-------------+
+   * </pre>
+   *
+   * Returned table format
+   *
+   * <pre>
+   *  +---------------------------+------------+------------+-------------+
+   *  |          file             | A_minValue | A_maxValue | A_nullCount |
+   *  +---------------------------+------------+------------+-------------+
+   *  | one_base_file.parquet     |          1 |         10 |           0 |
+   *  | another_base_file.parquet |        -10 |          0 |           5 |
+   *  +---------------------------+------------+------------+-------------+
+   * </pre>
+   *
+   * NOTE: Column Stats Index might potentially contain statistics for many 
columns (if not all), while
+   *       query at hand might only be referencing a handful of those. As 
such, we collect all the
+   *       column references from the filtering expressions, and only 
transpose records corresponding to the
+   *       columns referenced in those
+   *
+   * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing 
raw Column Stats Index records
+   * @param queryColumns target columns to be included into the final table
+   * @return reshaped table according to the format outlined above
+   */
+  private def transpose(colStatsRecords: 
HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String], 
expressionIndexQuery: Expression): (HoodieData[Row], StructType) = {
+    val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
+    // NOTE: We're sorting the columns to make sure final index schema matches 
layout
+    //       of the transposed table
+    val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+
+    // NOTE: This is a trick to avoid pulling all of 
[[ColumnStatsIndexSupport]] object into the lambdas'
+    //       closures below
+    val indexedColumns = queryColumns.toSet
+
+    // NOTE: It's crucial to maintain appropriate ordering of the columns
+    //       matching table layout: hence, we cherry-pick individual columns
+    //       instead of simply filtering in the ones we're interested in the 
schema
+    val (indexSchema, targetIndexedColumns) = 
composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema, 
expressionIndexQuery)
+
+    // Here we perform complex transformation which requires us to modify the 
layout of the rows
+    // of the dataset, and therefore we rely on low-level RDD API to avoid 
incurring encoding/decoding
+    // penalty of the [[Dataset]], since it's required to adhere to its schema 
at all times, while
+    // RDDs are not;
+    val transposedRows: HoodieData[Row] = colStatsRecords
+      //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
+      .filter(JFunction.toJavaSerializableFunction(r => 
sortedTargetColumnsSet.contains(r.getColumnName)))
+      .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+        if (r.getMinValue == null && r.getMaxValue == null) {
+          // Corresponding row could be null in either of the 2 cases
+          //    - Column contains only null values (in that case both min/max 
have to be nulls)
+          //    - This is a stubbed Column Stats record (used as a tombstone)
+          collection.Pair.of(r.getFileName, r)
+        } else {
+          val minValueWrapper = r.getMinValue
+          val maxValueWrapper = r.getMaxValue
+
+          checkState(minValueWrapper != null && maxValueWrapper != null, 
"Invalid Column Stats record: either both min/max have to be null, or both have 
to be non-null")
+
+          val colName = r.getColumnName
+          val colType = tableSchemaFieldMap(colName).dataType
+
+          val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper), 
colType)
+          val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper), 
colType)
+
+          // Update min-/max-value structs w/ unwrapped values in-place
+          r.setMinValue(minValue)
+          r.setMaxValue(maxValue)
+
+          collection.Pair.of(r.getFileName, r)
+        }
+      }))
+      .groupByKey()
+      .map(JFunction.toJavaSerializableFunction(p => {
+        val columnRecordsSeq: Seq[HoodieMetadataColumnStats] = 
p.getValue.asScala.toSeq
+        val fileName: String = p.getKey
+        val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+        // To properly align individual rows (corresponding to a file) w/in 
the transposed projection, we need
+        // to align existing column-stats for individual file with the list of 
expected ones for the
+        // whole transposed projection (a superset of all files)
+        val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName, 
r)).toMap
+        val alignedColStatRecordsSeq = 
targetIndexedColumns.map(columnRecordsMap.get)
+
+        val coalescedRowValuesSeq =
+          alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName, 
valueCount)) {
+            case (acc, opt) =>
+              opt match {
+                case Some(colStatRecord) =>
+                  acc ++= Seq(colStatRecord.getMinValue, 
colStatRecord.getMaxValue, colStatRecord.getNullCount)
+                case None =>
+                  // NOTE: This could occur in either of the following cases:
+                  //    1. When certain columns exist in the schema but are 
absent in some data files due to
+                  //       schema evolution or other reasons, these columns 
will not be present in the column stats.
+                  //       In this case, we fill in default values by setting 
the min, max and null-count to null
+                  //       (this behavior is consistent with reading 
non-existent columns from Parquet).
+                  //    2. When certain columns are present both in the schema 
and the data files,
+                  //       but the column stats are absent for these columns 
due to their types not supporting indexing,
+                  //       we also set these columns to default values.
+                  //
+                  // This approach prevents errors during data skipping and, 
because the filter includes an isNull check,
+                  // these conditions will not affect the accurate return of 
files from data skipping.
+                  acc ++= Seq(null, null, null)
+              }
+          }
+
+        Row(coalescedRowValuesSeq.toSeq: _*)
+      }))
+
+    (transposedRows, indexSchema)
+  }
+
+  def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns: 
Set[String], tableSchema: StructType, expressionIndexQuery: Expression): 
(StructType, Seq[String]) = {
+    val fileNameField = 
StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType, 
nullable = true, Metadata.empty)
+    val valueCountField = 
StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType, 
nullable = true, Metadata.empty)
+
+    val targetIndexedColumns = 
targetColumnNames.filter(indexedColumns.contains(_))
+    val targetIndexedFields = targetIndexedColumns.map(colName => 
tableSchema.fields.find(f => f.name == colName).get)
+
+    val dataType: DataType = expressionIndexQuery match {
+      case eq: EqualTo => eq.right.asInstanceOf[Literal].dataType
+      case in: In => in.list(0).asInstanceOf[Literal].dataType
+      case _ => targetIndexedFields(0).dataType
+    }
+
+    (StructType(
+      targetIndexedFields.foldLeft(Seq(fileNameField, valueCountField)) {
+        case (acc, field) =>
+          acc ++ Seq(
+            composeColumnStatStructType(field.name, 
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, dataType),
+            composeColumnStatStructType(field.name, 
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, dataType),
+            composeColumnStatStructType(field.name, 
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, LongType))
+      }
+    ), targetIndexedColumns)
+  }
+
+  def loadColumnStatsIndexRecords(targetColumns: Seq[String], 
prunedPartitions: Option[Set[String]] = None, shouldReadInMemory: Boolean): 
HoodieData[HoodieMetadataColumnStats] = {
+    // Read Metadata Table's Column Stats Index records into [[HoodieData]] 
container by
+    //    - Fetching the records from CSI by key-prefixes (encoded column 
names)
+    //    - Extracting [[HoodieMetadataColumnStats]] records
+    //    - Filtering out nulls
+    checkState(targetColumns.nonEmpty)
+
+    // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+    val encodedTargetColumnNames = targetColumns.map(colName => new 
ColumnIndexID(colName).asBase64EncodedString())
+    // encode column name and parition name if partition list is available
+    val keyPrefixes = if (prunedPartitions.isDefined) {
+      prunedPartitions.get.map(partitionPath =>
+        new 
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
+      ).flatMap(encodedPartition => {
+        encodedTargetColumnNames.map(encodedTargetColumn => 
encodedTargetColumn.concat(encodedPartition))
+      })
+    } else {
+      encodedTargetColumnNames
+    }
+
+    val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
+      metadataTable.getRecordsByKeyPrefixes(keyPrefixes.toSeq.asJava, 
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
+
+    val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+      //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
+      metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
+          toScalaOption(record.getData.getInsertValue(null, null))
+            .map(metadataRecord => 
metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+            .orNull
+        }))
+        .filter(JFunction.toJavaSerializableFunction(columnStatsRecord => 
columnStatsRecord != null))
+
+    columnStatsRecords
+  }
+
   override def invalidateCaches(): Unit = {
-    // no caches for this index type, do nothing
+    cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
+    cachedColumnStatsIndexViews.clear()
   }
 
   /**
@@ -89,15 +334,17 @@ class ExpressionIndexSupport(spark: SparkSession,
     metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent && 
!metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
   }
 
-  def filterQueriesWithFunctionalFilterKey(queryFilters: Seq[Expression], 
sourceFieldOpt: Option[String]): List[Tuple2[Expression, List[String]]] = {
+  private def filterQueriesWithFunctionalFilterKey(queryFilters: 
Seq[Expression], sourceFieldOpt: Option[String]): List[Tuple2[Expression, 
List[String]]] = {
     var expressionIndexQueries: List[Tuple2[Expression, List[String]]] = 
List.empty
     for (query <- queryFilters) {
-      filterQueryWithRecordKey(query, sourceFieldOpt, (expr: Expression) => {
+      val attributeFetcher = (expr: Expression) => {
         expr match {
           case expression: UnaryExpression => expression.child
+          case expression: FromUnixTime => expression.sec
           case other => other
         }
-      }).foreach({
+      }
+      filterQueryWithRecordKey(query, sourceFieldOpt, 
attributeFetcher).foreach({
         case (exp: Expression, literals: List[String]) =>
           expressionIndexQueries = expressionIndexQueries :+ Tuple2.apply(exp, 
literals)
       })
@@ -119,17 +366,17 @@ class ExpressionIndexSupport(spark: SparkSession,
    * @return An `Option` containing the index partition identifier if a 
matching index definition is found.
    *         Returns `None` if no matching index definition is found.
    */
-  private def getExpressionIndexPartitionAndLiterals(queryFilters: 
Seq[Expression]): Option[Tuple2[String, List[String]]] = {
+  private def getExpressionIndexPartitionAndLiterals(queryFilters: 
Seq[Expression]): Option[Tuple3[String, Expression, List[String]]] = {
     val indexDefinitions = 
metaClient.getIndexMetadata.get().getIndexDefinitions.asScala
     if (indexDefinitions.nonEmpty) {
       val functionDefinitions = indexDefinitions.values
         .filter(definition => 
MetadataPartitionType.fromPartitionPath(definition.getIndexName).equals(MetadataPartitionType.EXPRESSION_INDEX))
         .toList
-      var indexPartitionAndLiteralsOpt: Option[Tuple2[String, List[String]]] = 
Option.empty
+      var indexPartitionAndLiteralsOpt: Option[Tuple3[String, Expression, 
List[String]]] = Option.empty
       functionDefinitions.foreach(indexDefinition => {
         val queryInfoOpt = extractQueryAndLiterals(queryFilters, 
indexDefinition)
         if (queryInfoOpt.isDefined) {
-          indexPartitionAndLiteralsOpt = 
Option.apply(Tuple2.apply(indexDefinition.getIndexName, queryInfoOpt.get._2))
+          indexPartitionAndLiteralsOpt = 
Option.apply(Tuple3.apply(indexDefinition.getIndexName, queryInfoOpt.get._1, 
queryInfoOpt.get._2))
         }
       })
       indexPartitionAndLiteralsOpt
@@ -155,12 +402,38 @@ class ExpressionIndexSupport(spark: SparkSession,
       val functionNameOption = 
SPARK_FUNCTION_MAP.asScala.keys.find(expr.toString.contains)
       val functionName = functionNameOption.getOrElse("identity")
       if (indexDefinition.getIndexFunction.equals(functionName)) {
-        queryAndLiteralsOpt = Option.apply(Tuple2.apply(expr, literals))
+        val attributeFetcher = (expr: Expression) => {
+          expr match {
+            case expression: UnaryExpression => expression.child
+            case expression: FromUnixTime => expression.sec
+            case other => other
+          }
+        }
+        if (functionName.equals(SPARK_FROM_UNIXTIME)) {
+          val configuredFormat = 
indexDefinition.getIndexOptions.getOrDefault("format", 
TimestampFormatter.defaultPattern)
+          if (expr.toString().contains(configuredFormat)) {
+            val pruningExpr = fetchQueryWithAttribute(expr, 
Option.apply(indexDefinition.getSourceFields.get(0)), 
RecordLevelIndexSupport.getSimpleLiteralGenerator(), attributeFetcher)._1.get._1
+            queryAndLiteralsOpt = Option.apply(Tuple2.apply(pruningExpr, 
literals))
+          }
+        } else {
+          val pruningExpr = fetchQueryWithAttribute(expr, 
Option.apply(indexDefinition.getSourceFields.get(0)), 
RecordLevelIndexSupport.getSimpleLiteralGenerator(), attributeFetcher)._1.get._1
+          queryAndLiteralsOpt = Option.apply(Tuple2.apply(pruningExpr, 
literals))
+        }
       }
     }
     queryAndLiteralsOpt
   }
 
+  private def loadExpressionIndexRecords(indexPartition: String,
+                                         prunedPartitions: Set[String],
+                                         shouldReadInMemory: Boolean): 
HoodieData[HoodieMetadataColumnStats] = {
+    val indexDefinition = 
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+    val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
loadExpressionIndexForColumnsInternal(
+      indexDefinition.getSourceFields.asScala.toSeq, prunedPartitions, 
indexPartition, shouldReadInMemory)
+    //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
+    colStatsRecords
+  }
+
   def loadExpressionIndexDataFrame(indexPartition: String,
                                    prunedPartitions: Set[String],
                                    shouldReadInMemory: Boolean): DataFrame = {
@@ -169,10 +442,10 @@ class ExpressionIndexSupport(spark: SparkSession,
       val colStatsRecords: HoodieData[HoodieMetadataColumnStats] = 
loadExpressionIndexForColumnsInternal(
         indexDefinition.getSourceFields.asScala.toSeq, prunedPartitions, 
indexPartition, shouldReadInMemory)
       //TODO: [HUDI-8303] Explicit conversion might not be required for Scala 
2.12+
-      val catalystRows: HoodieData[InternalRow] = 
colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+      val catalystRows: HoodieData[InternalRow] = 
colStatsRecords.map[InternalRow](JFunction.toJavaSerializableFunction(r => {
         val converter = 
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
 columnStatsRecordStructType)
-        it.asScala.map(r => converter(r).orNull).asJava
-      }), false)
+        converter(r).orNull
+      }))
 
       if (shouldReadInMemory) {
         // NOTE: This will instantiate a [[Dataset]] backed by 
[[LocalRelation]] holding all of the rows
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 225d806f92b..3d1aea3ffa3 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -112,7 +112,7 @@ case class HoodieFileIndex(spark: SparkSession,
     new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
     new BucketIndexSupport(spark, metadataConfig, metaClient),
     new SecondaryIndexSupport(spark, metadataConfig, metaClient),
-    new ExpressionIndexSupport(spark, metadataConfig, metaClient),
+    new ExpressionIndexSupport(spark, schema, metadataConfig, metaClient),
     new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
     new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
   )
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 454014b9728..1cb20026876 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -229,6 +229,80 @@ object RecordLevelIndexSupport {
     }
   }
 
+  def fetchQueryWithAttribute(queryFilter: Expression, recordKeyOpt: 
Option[String], literalGenerator: Function2[AttributeReference, Literal, 
String],
+                               attributeFetcher: Function1[Expression, 
Expression]): (Option[(Expression, List[String])], Boolean) = {
+    queryFilter match {
+      case equalToQuery: EqualTo =>
+        val attributeLiteralTuple = 
getAttributeLiteralTuple(attributeFetcher.apply(equalToQuery.left), 
attributeFetcher.apply(equalToQuery.right)).orNull
+        if (attributeLiteralTuple != null) {
+          val attribute = attributeLiteralTuple._1
+          val literal = attributeLiteralTuple._2
+          if (attribute != null && attribute.name != null && 
attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
+            val recordKeyLiteral = literalGenerator.apply(attribute, literal)
+            (Option.apply(EqualTo(attribute, literal), 
List.apply(recordKeyLiteral)), true)
+          } else {
+            (Option.empty, true)
+          }
+        } else {
+          (Option.empty, true)
+        }
+
+      case inQuery: In =>
+        var validINQuery = true
+        val attributeOpt = Option.apply(
+          attributeFetcher.apply(inQuery.value) match {
+            case attribute: AttributeReference =>
+              if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
+                validINQuery = false
+                null
+              } else {
+                attribute
+              }
+            case _ =>
+              validINQuery = false
+              null
+          })
+        var literals: List[String] = List.empty
+        inQuery.list.foreach {
+          case literal: Literal if attributeOpt.isDefined =>
+            val recordKeyLiteral = literalGenerator.apply(attributeOpt.get, 
literal)
+            literals = literals :+ recordKeyLiteral
+          case _ => validINQuery = false
+        }
+        if (validINQuery) {
+          (Option.apply(In(attributeOpt.get, inQuery.list), literals), true)
+        } else {
+          (Option.empty, true)
+        }
+
+      // Handle And expression (composite filter)
+      case andQuery: And =>
+        val leftResult = filterQueryWithRecordKey(andQuery.left, recordKeyOpt, 
literalGenerator, attributeFetcher)
+        val rightResult = filterQueryWithRecordKey(andQuery.right, 
recordKeyOpt, literalGenerator, attributeFetcher)
+
+        val isSupported = leftResult._2 && rightResult._2
+        if (!isSupported) {
+          (Option.empty, false)
+        } else {
+          // If both left and right filters are valid, concatenate their 
results
+          (leftResult._1, rightResult._1) match {
+            case (Some((leftExp, leftKeys)), Some((rightExp, rightKeys))) =>
+              // Return concatenated expressions and record keys
+              (Option.apply(And(leftExp, rightExp), leftKeys ++ rightKeys), 
true)
+            case (Some((leftExp, leftKeys)), None) =>
+              // Return concatenated expressions and record keys
+              (Option.apply(leftExp, leftKeys), true)
+            case (None, Some((rightExp, rightKeys))) =>
+              // Return concatenated expressions and record keys
+              (Option.apply(rightExp, rightKeys), true)
+            case _ => (Option.empty, true)
+          }
+        }
+
+      case _ => (Option.empty, false)
+    }
+  }
+
   /**
    * Returns the list of storage paths from the pruned partitions and file 
slices.
    *
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index ad0b7bcd6b6..c698ad37ee8 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -99,9 +99,9 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
     (prunedPartitions, prunedFiles)
   }
 
-  protected def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
+  protected def getCandidateFiles(indexDf: DataFrame, queryFilters: 
Seq[Expression], prunedFileNames: Set[String], isExpressionIndex: Boolean = 
false): Set[String] = {
     val indexSchema = indexDf.schema
-    val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, 
indexSchema)).reduce(And)
+    val indexFilter = 
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema, 
isExpressionIndex)).reduce(And)
     val prunedCandidateFileNames =
       indexDf.where(new Column(indexFilter))
         .select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index b9252eb831d..6e74c2f1e35 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -33,16 +33,18 @@ import org.apache.spark.unsafe.types.UTF8String
 object DataSkippingUtils extends Logging {
 
   /**
-   * Translates provided {@link filterExpr} into corresponding 
filter-expression for column-stats index index table
-   * to filter out candidate files that would hold records matching the 
original filter
+   * Translates provided {@link filterExpr} into corresponding 
filter-expression for column-stats index table
+   * to filter out candidate files that would hold records matching the 
original filter.
+   * In case the column stats were creating using expression index, the index 
filter must also account for the expression.
    *
    * @param dataTableFilterExpr source table's query's filter expression
    * @param indexSchema index table schema
+   * @param isExpressionIndex whether the index is an expression index
    * @return filter for column-stats index's table
    */
-  def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression, 
indexSchema: StructType): Expression = {
+  def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression, 
indexSchema: StructType, isExpressionIndex: Boolean = false): Expression = {
     try {
-      createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, 
indexSchema)
+      createColumnStatsIndexFilterExprInternal(dataTableFilterExpr, 
indexSchema, isExpressionIndex)
     } catch {
       case e: AnalysisException =>
         logDebug(s"Failed to translated provided data table filter expr into 
column stats one ($dataTableFilterExpr)", e)
@@ -50,10 +52,10 @@ object DataSkippingUtils extends Logging {
     }
   }
 
-  private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr: 
Expression, indexSchema: StructType): Expression = {
+  private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr: 
Expression, indexSchema: StructType, isExpressionIndex: Boolean = false): 
Expression = {
     // Try to transform original Source Table's filter expression into
     // Column-Stats Index filter expression
-    tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema) match {
+    tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema, 
isExpressionIndex) match {
       case Some(e) => e
       // NOTE: In case we can't transform source filter expression, we fallback
       // to {@code TrueLiteral}, to essentially avoid pruning any indexed 
files from scanning
@@ -61,7 +63,7 @@ object DataSkippingUtils extends Logging {
     }
   }
 
-  private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression, 
indexSchema: StructType): Option[Expression] = {
+  private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression, 
indexSchema: StructType, isExpressionIndex: Boolean = false): 
Option[Expression] = {
     //
     // For translation of the Filter Expression for the Data Table into Filter 
Expression for Column Stats Index, we're
     // assuming that
@@ -93,7 +95,7 @@ object DataSkippingUtils extends Logging {
     //
     sourceFilterExpr match {
       // If Expression is not resolved, we can't perform the analysis 
accurately, bailing
-      case expr if !expr.resolved => None
+      case expr if !expr.resolved && !isExpressionIndex => None
 
       // Filter "expr(colA) = B" and "B = expr(colA)"
       // Translates to "(expr(colA_minValue) <= B) AND (B <= 
expr(colA_maxValue))" condition for index lookup
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
index e1604c98892..d4f230adc37 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
@@ -19,12 +19,11 @@
 
 package org.apache.spark.sql.hudi.command.index
 
-import org.apache.hudi.DataSourceWriteOptions.{INSERT_OPERATION_OPT_VAL, 
OPERATION, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.DataSourceWriteOptions.{HIVE_PASS, HIVE_USER, 
HIVE_USE_PRE_APACHE_INPUT_FORMAT, INSERT_OPERATION_OPT_VAL, OPERATION, 
PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
 import org.apache.hudi.HoodieConversionUtils.toProperties
 import org.apache.hudi.client.SparkRDDWriteClient
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.client.utils.SparkMetadataWriterUtils
-import org.apache.hudi.{DataSourceReadOptions, ExpressionIndexSupport, 
HoodieFileIndex, HoodieSparkUtils}
 import org.apache.hudi.common.config.{HoodieMetadataConfig, 
HoodieStorageConfig, TypedProperties}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.FileSlice
@@ -33,7 +32,6 @@ import org.apache.hudi.common.table.view.FileSystemViewManager
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.Option
 import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig, 
HoodieIndexConfig, HoodieWriteConfig}
-import org.apache.hudi.hive.HiveSyncConfigHolder._
 import org.apache.hudi.hive.testutils.HiveTestUtil
 import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
 import org.apache.hudi.index.HoodieIndex
@@ -44,8 +42,10 @@ import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH, 
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
 import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, ExpressionIndexSupport, 
HoodieFileIndex, HoodieSparkUtils}
 import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{Column, SaveMode}
+import org.apache.spark.sql.Column.unapply
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, FromUnixTime, Literal, Upper}
@@ -53,7 +53,8 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.functions.lit
 import org.apache.spark.sql.hudi.command.{CreateIndexCommand, 
ShowIndexesCommand}
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.apache.spark.sql.types.{BinaryType, ByteType, DateType, 
DecimalType, IntegerType, ShortType, StringType, StructType, TimestampType}
+import org.apache.spark.sql.types.{BinaryType, ByteType, DateType, 
DecimalType, DoubleType, IntegerType, LongType, ShortType, StringType, 
StructField, StructType, TimestampType}
+import org.apache.spark.sql.{Column, SaveMode, functions}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
 import org.junit.jupiter.api.Test
 import org.scalatest.Ignore
@@ -442,6 +443,10 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
                | partitioned by(price)
                | location '$basePath'
        """.stripMargin)
+          if (HoodieSparkUtils.gteqSpark3_4) {
+            spark.sql("set spark.sql.defaultColumn.enabled=false")
+          }
+
           spark.sql(s"insert into $tableName (id, name, ts, price) values(1, 
'a1', 1000, 10)")
           spark.sql(s"insert into $tableName (id, name, ts, price) values(2, 
'a2', 200000, 100)")
           spark.sql(s"insert into $tableName (id, name, ts, price) values(3, 
'a3', 2000000000, 1000)")
@@ -758,6 +763,99 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
     }
   }
 
+  /**
+   * Test expression index with data skipping for unary expression and binary 
expression.
+   */
+  @Test
+  def testColumnStatsPruningWithUnaryBinaryExpr(): Unit = {
+    if (HoodieSparkUtils.gteqSpark3_3) {
+      withTempDir { tmp =>
+        Seq("cow", "mor").foreach { tableType =>
+          val tableName = generateTableName + 
s"_stats_pruning_binary_$tableType"
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+          spark.sql(
+            s"""
+           CREATE TABLE $tableName (
+               |    ts LONG,
+               |    id STRING,
+               |    rider STRING,
+               |    driver STRING,
+               |    fare DOUBLE,
+               |    city STRING,
+               |    state STRING
+               |) USING HUDI
+               |options(
+               |    primaryKey ='id',
+               |    type = '$tableType',
+               |    hoodie.metadata.enable = 'true',
+               |    hoodie.datasource.write.recordkey.field = 'id',
+               |    hoodie.enable.data.skipping = 'true'
+               |)
+               |PARTITIONED BY (state)
+               |location '$basePath'
+               |""".stripMargin)
+
+          spark.sql("set hoodie.parquet.small.file.limit=0")
+          if (HoodieSparkUtils.gteqSpark3_4) {
+            spark.sql("set spark.sql.defaultColumn.enabled=false")
+          }
+
+          spark.sql(
+            s"""
+               |insert into $tableName(ts, id, rider, driver, fare, city, 
state) VALUES
+               |  
(1695414527,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+               |  
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+               |  
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+               |  
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+               |""".stripMargin)
+          spark.sql(
+            s"""
+               |insert into $tableName(ts, id, rider, driver, fare, city, 
state) VALUES
+               |  
(1695414520,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+               |  
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+               |""".stripMargin)
+
+          // With unary expression
+          spark.sql(s"create index idx_rider on $tableName using 
column_stats(rider) options(expr='lower')")
+          // With binary expression
+          spark.sql(s"create index idx_datestr on $tableName using 
column_stats(ts) options(expr='from_unixtime', format='yyyy-MM-dd')")
+          // validate index created successfully
+          var metaClient = createMetaClient(spark, basePath)
+          assertTrue(metaClient.getIndexMetadata.isPresent)
+          val expressionIndexMetadata = metaClient.getIndexMetadata.get()
+          assertEquals("expr_index_idx_datestr", 
expressionIndexMetadata.getIndexDefinitions.get("expr_index_idx_datestr").getIndexName)
+
+          val tableSchema: StructType =
+            StructType(
+              Seq(
+                StructField("ts", LongType),
+                StructField("id", StringType),
+                StructField("rider", StringType),
+                StructField("driver", StringType),
+                StructField("fare", DoubleType),
+                StructField("city", StringType),
+                StructField("state", StringType)
+              )
+            )
+          val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key 
-> "true", HoodieMetadataConfig.ENABLE.key -> "true")
+          metaClient = createMetaClient(spark, basePath)
+
+          // validate skipping with both types of expression
+          val lowerExpr = resolveExpr(spark, 
unapply(functions.lower(functions.col("rider"))).get, tableSchema)
+          var literal = Literal.create("rider-c")
+          var dataFilter = EqualTo(lowerExpr, literal)
+          verifyFilePruning(opts, dataFilter, metaClient, 
isDataSkippingExpected = true)
+
+          val fromUnixTime = resolveExpr(spark, 
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get, 
tableSchema)
+          literal = Literal.create("2023-11-07")
+          dataFilter = EqualTo(fromUnixTime, literal)
+          verifyFilePruning(opts, dataFilter, metaClient, 
isDataSkippingExpected = true)
+        }
+      }
+    }
+  }
+
   @Test
   def testBloomFiltersIndexPruning(): Unit = {
     if (HoodieSparkUtils.gteqSpark3_3) {
@@ -1054,7 +1152,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
         val metadataConfig = HoodieMetadataConfig.newBuilder()
           .fromProperties(toProperties(metadataOpts))
           .build()
-        val expressionIndexSupport = new ExpressionIndexSupport(spark, 
metadataConfig, metaClient)
+        val expressionIndexSupport = new ExpressionIndexSupport(spark, null, 
metadataConfig, metaClient)
         val prunedPartitions = Set("9")
         var indexDf = 
expressionIndexSupport.loadExpressionIndexDataFrame("expr_index_idx_datestr", 
prunedPartitions, shouldReadInMemory = true)
         // check only one record returned corresponding to the pruned partition
@@ -1144,7 +1242,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
           .build()
         val fileIndex = new HoodieFileIndex(spark, metaClient, None,
           opts ++ metadataOpts ++ Map("glob.paths" -> s"$basePath/9", 
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"), includeLogFiles = 
true)
-        val expressionIndexSupport = new ExpressionIndexSupport(spark, 
metadataConfig, metaClient)
+        val expressionIndexSupport = new ExpressionIndexSupport(spark, null, 
metadataConfig, metaClient)
         val partitionFilter: Expression = EqualTo(AttributeReference("c8", 
IntegerType)(), Literal(9))
         val (isPruned, prunedPaths) = 
fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq(partitionFilter))
         assertTrue(isPruned)


Reply via email to