This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a1cf2439e98 [HUDI-8643] Filter records for from_unixtime function with
expression index (#12416)
a1cf2439e98 is described below
commit a1cf2439e983a35c53c9c815a4c8e64e05af615f
Author: Lokesh Jain <[email protected]>
AuthorDate: Wed Dec 4 11:06:32 2024 +0530
[HUDI-8643] Filter records for from_unixtime function with expression index
(#12416)
* [HUDI-8643] Filter records for from_unixtime function with expression
index
* fix test by using diff table name
* minor code cleanup
---------
Co-authored-by: Sagar Sumit <[email protected]>
---
.../org/apache/hudi/ColumnStatsIndexSupport.scala | 6 +-
.../org/apache/hudi/ExpressionIndexSupport.scala | 311 +++++++++++++++++++--
.../scala/org/apache/hudi/HoodieFileIndex.scala | 2 +-
.../org/apache/hudi/RecordLevelIndexSupport.scala | 74 +++++
.../org/apache/hudi/SparkBaseIndexSupport.scala | 4 +-
.../apache/spark/sql/hudi/DataSkippingUtils.scala | 18 +-
.../hudi/command/index/TestExpressionIndex.scala | 112 +++++++-
7 files changed, 487 insertions(+), 40 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
index fc1f15ad409..1becf688324 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ColumnStatsIndexSupport.scala
@@ -435,10 +435,10 @@ object ColumnStatsIndexSupport {
String.format("%s_%s", col, statName)
}
- @inline private def composeColumnStatStructType(col: String, statName:
String, dataType: DataType) =
+ @inline def composeColumnStatStructType(col: String, statName: String,
dataType: DataType) =
StructField(formatColName(col, statName), dataType, nullable = true,
Metadata.empty)
- private def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
+ def tryUnpackValueWrapper(valueWrapper: AnyRef): Any = {
valueWrapper match {
case w: BooleanWrapper => w.getValue
case w: IntWrapper => w.getValue
@@ -461,7 +461,7 @@ object ColumnStatsIndexSupport {
val decConv = new DecimalConversion()
- private def deserialize(value: Any, dataType: DataType): Any = {
+ def deserialize(value: Any, dataType: DataType): Any = {
dataType match {
// NOTE: Since we can't rely on Avro's "date", and "timestamp-micros"
logical-types, we're
// manually encoding corresponding values as int and long w/in the
Column Stats Index and
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
index 37e27ccd1a1..154498b57ec 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/ExpressionIndexSupport.scala
@@ -19,34 +19,47 @@
package org.apache.hudi
+import org.apache.hudi.ColumnStatsIndexSupport.{composeColumnStatStructType,
deserialize, tryUnpackValueWrapper}
import org.apache.hudi.ExpressionIndexSupport._
+import org.apache.hudi.HoodieCatalystUtils.{withPersistedData,
withPersistedDataset}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieSparkExpressionIndex.SPARK_FUNCTION_MAP
-import org.apache.hudi.RecordLevelIndexSupport.filterQueryWithRecordKey
+import org.apache.hudi.RecordLevelIndexSupport.{fetchQueryWithAttribute,
filterQueryWithRecordKey}
import org.apache.hudi.avro.model.{HoodieMetadataColumnStats,
HoodieMetadataRecord}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.data.HoodieData
import org.apache.hudi.common.model.{FileSlice, HoodieIndexDefinition,
HoodieRecord}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ValidationUtils.checkState
+import org.apache.hudi.common.util.collection
import org.apache.hudi.common.util.hash.{ColumnIndexID, PartitionIndexID}
import org.apache.hudi.data.HoodieJavaRDD
+import
org.apache.hudi.index.functional.HoodieExpressionIndex.SPARK_FROM_UNIXTIME
import org.apache.hudi.metadata.{HoodieMetadataPayload,
HoodieTableMetadataUtil, MetadataPartitionType}
import org.apache.hudi.util.JFunction
-import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD}
+import
org.apache.spark.sql.HoodieUnsafeUtils.{createDataFrameFromInternalRows,
createDataFrameFromRDD, createDataFrameFromRows}
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
+import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression,
FromUnixTime, In, Literal, UnaryExpression}
+import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.functions.col
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
import scala.collection.JavaConverters._
+import scala.collection.immutable.TreeSet
+import scala.collection.mutable.ListBuffer
+import scala.collection.parallel.mutable.ParHashMap
class ExpressionIndexSupport(spark: SparkSession,
+ tableSchema: StructType,
metadataConfig: HoodieMetadataConfig,
metaClient: HoodieTableMetaClient)
extends SparkBaseIndexSupport (spark, metadataConfig, metaClient) {
+ @transient private lazy val cachedColumnStatsIndexViews:
ParHashMap[Seq[String], DataFrame] = ParHashMap()
+
+
// NOTE: Since [[metadataConfig]] is transient this has to be eagerly
persisted, before this will be passed on to the executor
private val inMemoryProjectionThreshold =
metadataConfig.getColumnStatsIndexInMemoryProjectionThreshold
@@ -60,13 +73,15 @@ class ExpressionIndexSupport(spark: SparkSession,
): Option[Set[String]] = {
lazy val expressionIndexPartitionOpt =
getExpressionIndexPartitionAndLiterals(queryFilters)
if (isIndexAvailable && queryFilters.nonEmpty &&
expressionIndexPartitionOpt.nonEmpty) {
- val (indexPartition, literals) = expressionIndexPartitionOpt.get
+ val (indexPartition, expressionIndexQuery, literals) =
expressionIndexPartitionOpt.get
val indexDefinition =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS))
{
val readInMemory = shouldReadInMemory(fileIndex,
queryReferencedColumns, inMemoryProjectionThreshold)
val (prunedPartitions, prunedFileNames) =
getPrunedPartitionsAndFileNames(fileIndex, prunedPartitionsAndFileSlices)
- val indexDf = loadExpressionIndexDataFrame(indexPartition,
prunedPartitions, readInMemory)
- Some(getCandidateFiles(indexDf, queryFilters, prunedFileNames))
+ val expressionIndexRecords =
loadExpressionIndexRecords(indexPartition, prunedPartitions, readInMemory)
+ loadTransposed(queryReferencedColumns, readInMemory,
expressionIndexRecords, expressionIndexQuery) {
+ transposedColStatsDF =>Some(getCandidateFiles(transposedColStatsDF,
Seq(expressionIndexQuery), prunedFileNames, isExpressionIndex = true))
+ }
} else if
(indexDefinition.getIndexType.equals(HoodieTableMetadataUtil.PARTITION_NAME_BLOOM_FILTERS))
{
val prunedPartitionAndFileNames =
getPrunedPartitionsAndFileNamesMap(prunedPartitionsAndFileSlices,
includeLogFiles = true)
Option.apply(getCandidateFilesForKeys(indexPartition,
prunedPartitionAndFileNames, literals))
@@ -78,8 +93,238 @@ class ExpressionIndexSupport(spark: SparkSession,
}
}
+ /**
+ * Loads view of the Column Stats Index in a transposed format where single
row coalesces every columns'
+ * statistics for a single file, returning it as [[DataFrame]]
+ *
+ * Please check out scala-doc of the [[transpose]] method explaining this
view in more details
+ */
+ def loadTransposed[T](targetColumns: Seq[String],
+ shouldReadInMemory: Boolean,
+ colStatRecords: HoodieData[HoodieMetadataColumnStats],
+ expressionIndexQuery: Expression) (block: DataFrame =>
T): T = {
+ cachedColumnStatsIndexViews.get(targetColumns) match {
+ case Some(cachedDF) =>
+ block(cachedDF)
+ case None =>
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
colStatRecords
+ withPersistedData(colStatsRecords, StorageLevel.MEMORY_ONLY) {
+ val (transposedRows, indexSchema) = transpose(colStatsRecords,
targetColumns, expressionIndexQuery)
+ val df = if (shouldReadInMemory) {
+ // NOTE: This will instantiate a [[Dataset]] backed by
[[LocalRelation]] holding all of the rows
+ // of the transposed table in memory, facilitating execution
of the subsequently chained operations
+ // on it locally (on the driver; all such operations are
actually going to be performed by Spark's
+ // Optimizer)
+ createDataFrameFromRows(spark,
transposedRows.collectAsList().asScala.toSeq, indexSchema)
+ } else {
+ val rdd = HoodieJavaRDD.getJavaRDD(transposedRows)
+ spark.createDataFrame(rdd, indexSchema)
+ }
+
+ val allowCaching: Boolean = false
+ if (allowCaching) {
+ cachedColumnStatsIndexViews.put(targetColumns, df)
+ // NOTE: Instead of collecting the rows from the index and hold
them in memory, we instead rely
+ // on Spark as (potentially distributed) cache managing data
lifecycle, while we simply keep
+ // the referenced to persisted [[DataFrame]] instance
+ df.persist(StorageLevel.MEMORY_ONLY)
+
+ block(df)
+ } else {
+ withPersistedDataset(df) {
+ block(df)
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Transposes and converts the raw table format of the Column Stats Index
representation,
+ * where each row/record corresponds to individual (column, file) pair, into
the table format
+ * where each row corresponds to single file with statistic for individual
columns collated
+ * w/in such row:
+ *
+ * Metadata Table Column Stats Index format:
+ *
+ * <pre>
+ *
+---------------------------+------------+------------+------------+-------------+
+ * | fileName | columnName | minValue | maxValue |
num_nulls |
+ *
+---------------------------+------------+------------+------------+-------------+
+ * | one_base_file.parquet | A | 1 | 10 |
0 |
+ * | another_base_file.parquet | A | -10 | 0 |
5 |
+ *
+---------------------------+------------+------------+------------+-------------+
+ * </pre>
+ *
+ * Returned table format
+ *
+ * <pre>
+ * +---------------------------+------------+------------+-------------+
+ * | file | A_minValue | A_maxValue | A_nullCount |
+ * +---------------------------+------------+------------+-------------+
+ * | one_base_file.parquet | 1 | 10 | 0 |
+ * | another_base_file.parquet | -10 | 0 | 5 |
+ * +---------------------------+------------+------------+-------------+
+ * </pre>
+ *
+ * NOTE: Column Stats Index might potentially contain statistics for many
columns (if not all), while
+ * query at hand might only be referencing a handful of those. As
such, we collect all the
+ * column references from the filtering expressions, and only
transpose records corresponding to the
+ * columns referenced in those
+ *
+ * @param colStatsRecords [[HoodieData[HoodieMetadataColumnStats]]] bearing
raw Column Stats Index records
+ * @param queryColumns target columns to be included into the final table
+ * @return reshaped table according to the format outlined above
+ */
+ private def transpose(colStatsRecords:
HoodieData[HoodieMetadataColumnStats], queryColumns: Seq[String],
expressionIndexQuery: Expression): (HoodieData[Row], StructType) = {
+ val tableSchemaFieldMap = tableSchema.fields.map(f => (f.name, f)).toMap
+ // NOTE: We're sorting the columns to make sure final index schema matches
layout
+ // of the transposed table
+ val sortedTargetColumnsSet = TreeSet(queryColumns:_*)
+
+ // NOTE: This is a trick to avoid pulling all of
[[ColumnStatsIndexSupport]] object into the lambdas'
+ // closures below
+ val indexedColumns = queryColumns.toSet
+
+ // NOTE: It's crucial to maintain appropriate ordering of the columns
+ // matching table layout: hence, we cherry-pick individual columns
+ // instead of simply filtering in the ones we're interested in the
schema
+ val (indexSchema, targetIndexedColumns) =
composeIndexSchema(sortedTargetColumnsSet.toSeq, indexedColumns, tableSchema,
expressionIndexQuery)
+
+ // Here we perform complex transformation which requires us to modify the
layout of the rows
+ // of the dataset, and therefore we rely on low-level RDD API to avoid
incurring encoding/decoding
+ // penalty of the [[Dataset]], since it's required to adhere to its schema
at all times, while
+ // RDDs are not;
+ val transposedRows: HoodieData[Row] = colStatsRecords
+ //TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
+ .filter(JFunction.toJavaSerializableFunction(r =>
sortedTargetColumnsSet.contains(r.getColumnName)))
+ .mapToPair(JFunction.toJavaSerializablePairFunction(r => {
+ if (r.getMinValue == null && r.getMaxValue == null) {
+ // Corresponding row could be null in either of the 2 cases
+ // - Column contains only null values (in that case both min/max
have to be nulls)
+ // - This is a stubbed Column Stats record (used as a tombstone)
+ collection.Pair.of(r.getFileName, r)
+ } else {
+ val minValueWrapper = r.getMinValue
+ val maxValueWrapper = r.getMaxValue
+
+ checkState(minValueWrapper != null && maxValueWrapper != null,
"Invalid Column Stats record: either both min/max have to be null, or both have
to be non-null")
+
+ val colName = r.getColumnName
+ val colType = tableSchemaFieldMap(colName).dataType
+
+ val minValue = deserialize(tryUnpackValueWrapper(minValueWrapper),
colType)
+ val maxValue = deserialize(tryUnpackValueWrapper(maxValueWrapper),
colType)
+
+ // Update min-/max-value structs w/ unwrapped values in-place
+ r.setMinValue(minValue)
+ r.setMaxValue(maxValue)
+
+ collection.Pair.of(r.getFileName, r)
+ }
+ }))
+ .groupByKey()
+ .map(JFunction.toJavaSerializableFunction(p => {
+ val columnRecordsSeq: Seq[HoodieMetadataColumnStats] =
p.getValue.asScala.toSeq
+ val fileName: String = p.getKey
+ val valueCount: Long = columnRecordsSeq.head.getValueCount
+
+ // To properly align individual rows (corresponding to a file) w/in
the transposed projection, we need
+ // to align existing column-stats for individual file with the list of
expected ones for the
+ // whole transposed projection (a superset of all files)
+ val columnRecordsMap = columnRecordsSeq.map(r => (r.getColumnName,
r)).toMap
+ val alignedColStatRecordsSeq =
targetIndexedColumns.map(columnRecordsMap.get)
+
+ val coalescedRowValuesSeq =
+ alignedColStatRecordsSeq.foldLeft(ListBuffer[Any](fileName,
valueCount)) {
+ case (acc, opt) =>
+ opt match {
+ case Some(colStatRecord) =>
+ acc ++= Seq(colStatRecord.getMinValue,
colStatRecord.getMaxValue, colStatRecord.getNullCount)
+ case None =>
+ // NOTE: This could occur in either of the following cases:
+ // 1. When certain columns exist in the schema but are
absent in some data files due to
+ // schema evolution or other reasons, these columns
will not be present in the column stats.
+ // In this case, we fill in default values by setting
the min, max and null-count to null
+ // (this behavior is consistent with reading
non-existent columns from Parquet).
+ // 2. When certain columns are present both in the schema
and the data files,
+ // but the column stats are absent for these columns
due to their types not supporting indexing,
+ // we also set these columns to default values.
+ //
+ // This approach prevents errors during data skipping and,
because the filter includes an isNull check,
+ // these conditions will not affect the accurate return of
files from data skipping.
+ acc ++= Seq(null, null, null)
+ }
+ }
+
+ Row(coalescedRowValuesSeq.toSeq: _*)
+ }))
+
+ (transposedRows, indexSchema)
+ }
+
+ def composeIndexSchema(targetColumnNames: Seq[String], indexedColumns:
Set[String], tableSchema: StructType, expressionIndexQuery: Expression):
(StructType, Seq[String]) = {
+ val fileNameField =
StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME, StringType,
nullable = true, Metadata.empty)
+ val valueCountField =
StructField(HoodieMetadataPayload.COLUMN_STATS_FIELD_VALUE_COUNT, LongType,
nullable = true, Metadata.empty)
+
+ val targetIndexedColumns =
targetColumnNames.filter(indexedColumns.contains(_))
+ val targetIndexedFields = targetIndexedColumns.map(colName =>
tableSchema.fields.find(f => f.name == colName).get)
+
+ val dataType: DataType = expressionIndexQuery match {
+ case eq: EqualTo => eq.right.asInstanceOf[Literal].dataType
+ case in: In => in.list(0).asInstanceOf[Literal].dataType
+ case _ => targetIndexedFields(0).dataType
+ }
+
+ (StructType(
+ targetIndexedFields.foldLeft(Seq(fileNameField, valueCountField)) {
+ case (acc, field) =>
+ acc ++ Seq(
+ composeColumnStatStructType(field.name,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MIN_VALUE, dataType),
+ composeColumnStatStructType(field.name,
HoodieMetadataPayload.COLUMN_STATS_FIELD_MAX_VALUE, dataType),
+ composeColumnStatStructType(field.name,
HoodieMetadataPayload.COLUMN_STATS_FIELD_NULL_COUNT, LongType))
+ }
+ ), targetIndexedColumns)
+ }
+
+ def loadColumnStatsIndexRecords(targetColumns: Seq[String],
prunedPartitions: Option[Set[String]] = None, shouldReadInMemory: Boolean):
HoodieData[HoodieMetadataColumnStats] = {
+ // Read Metadata Table's Column Stats Index records into [[HoodieData]]
container by
+ // - Fetching the records from CSI by key-prefixes (encoded column
names)
+ // - Extracting [[HoodieMetadataColumnStats]] records
+ // - Filtering out nulls
+ checkState(targetColumns.nonEmpty)
+
+ // TODO encoding should be done internally w/in HoodieBackedTableMetadata
+ val encodedTargetColumnNames = targetColumns.map(colName => new
ColumnIndexID(colName).asBase64EncodedString())
+ // encode column name and parition name if partition list is available
+ val keyPrefixes = if (prunedPartitions.isDefined) {
+ prunedPartitions.get.map(partitionPath =>
+ new
PartitionIndexID(HoodieTableMetadataUtil.getPartitionIdentifier(partitionPath)).asBase64EncodedString()
+ ).flatMap(encodedPartition => {
+ encodedTargetColumnNames.map(encodedTargetColumn =>
encodedTargetColumn.concat(encodedPartition))
+ })
+ } else {
+ encodedTargetColumnNames
+ }
+
+ val metadataRecords: HoodieData[HoodieRecord[HoodieMetadataPayload]] =
+ metadataTable.getRecordsByKeyPrefixes(keyPrefixes.toSeq.asJava,
HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, shouldReadInMemory)
+
+ val columnStatsRecords: HoodieData[HoodieMetadataColumnStats] =
+ //TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
+ metadataRecords.map(JFunction.toJavaSerializableFunction(record => {
+ toScalaOption(record.getData.getInsertValue(null, null))
+ .map(metadataRecord =>
metadataRecord.asInstanceOf[HoodieMetadataRecord].getColumnStatsMetadata)
+ .orNull
+ }))
+ .filter(JFunction.toJavaSerializableFunction(columnStatsRecord =>
columnStatsRecord != null))
+
+ columnStatsRecords
+ }
+
override def invalidateCaches(): Unit = {
- // no caches for this index type, do nothing
+ cachedColumnStatsIndexViews.foreach { case (_, df) => df.unpersist() }
+ cachedColumnStatsIndexViews.clear()
}
/**
@@ -89,15 +334,17 @@ class ExpressionIndexSupport(spark: SparkSession,
metadataConfig.isEnabled && metaClient.getIndexMetadata.isPresent &&
!metaClient.getIndexMetadata.get().getIndexDefinitions.isEmpty
}
- def filterQueriesWithFunctionalFilterKey(queryFilters: Seq[Expression],
sourceFieldOpt: Option[String]): List[Tuple2[Expression, List[String]]] = {
+ private def filterQueriesWithFunctionalFilterKey(queryFilters:
Seq[Expression], sourceFieldOpt: Option[String]): List[Tuple2[Expression,
List[String]]] = {
var expressionIndexQueries: List[Tuple2[Expression, List[String]]] =
List.empty
for (query <- queryFilters) {
- filterQueryWithRecordKey(query, sourceFieldOpt, (expr: Expression) => {
+ val attributeFetcher = (expr: Expression) => {
expr match {
case expression: UnaryExpression => expression.child
+ case expression: FromUnixTime => expression.sec
case other => other
}
- }).foreach({
+ }
+ filterQueryWithRecordKey(query, sourceFieldOpt,
attributeFetcher).foreach({
case (exp: Expression, literals: List[String]) =>
expressionIndexQueries = expressionIndexQueries :+ Tuple2.apply(exp,
literals)
})
@@ -119,17 +366,17 @@ class ExpressionIndexSupport(spark: SparkSession,
* @return An `Option` containing the index partition identifier if a
matching index definition is found.
* Returns `None` if no matching index definition is found.
*/
- private def getExpressionIndexPartitionAndLiterals(queryFilters:
Seq[Expression]): Option[Tuple2[String, List[String]]] = {
+ private def getExpressionIndexPartitionAndLiterals(queryFilters:
Seq[Expression]): Option[Tuple3[String, Expression, List[String]]] = {
val indexDefinitions =
metaClient.getIndexMetadata.get().getIndexDefinitions.asScala
if (indexDefinitions.nonEmpty) {
val functionDefinitions = indexDefinitions.values
.filter(definition =>
MetadataPartitionType.fromPartitionPath(definition.getIndexName).equals(MetadataPartitionType.EXPRESSION_INDEX))
.toList
- var indexPartitionAndLiteralsOpt: Option[Tuple2[String, List[String]]] =
Option.empty
+ var indexPartitionAndLiteralsOpt: Option[Tuple3[String, Expression,
List[String]]] = Option.empty
functionDefinitions.foreach(indexDefinition => {
val queryInfoOpt = extractQueryAndLiterals(queryFilters,
indexDefinition)
if (queryInfoOpt.isDefined) {
- indexPartitionAndLiteralsOpt =
Option.apply(Tuple2.apply(indexDefinition.getIndexName, queryInfoOpt.get._2))
+ indexPartitionAndLiteralsOpt =
Option.apply(Tuple3.apply(indexDefinition.getIndexName, queryInfoOpt.get._1,
queryInfoOpt.get._2))
}
})
indexPartitionAndLiteralsOpt
@@ -155,12 +402,38 @@ class ExpressionIndexSupport(spark: SparkSession,
val functionNameOption =
SPARK_FUNCTION_MAP.asScala.keys.find(expr.toString.contains)
val functionName = functionNameOption.getOrElse("identity")
if (indexDefinition.getIndexFunction.equals(functionName)) {
- queryAndLiteralsOpt = Option.apply(Tuple2.apply(expr, literals))
+ val attributeFetcher = (expr: Expression) => {
+ expr match {
+ case expression: UnaryExpression => expression.child
+ case expression: FromUnixTime => expression.sec
+ case other => other
+ }
+ }
+ if (functionName.equals(SPARK_FROM_UNIXTIME)) {
+ val configuredFormat =
indexDefinition.getIndexOptions.getOrDefault("format",
TimestampFormatter.defaultPattern)
+ if (expr.toString().contains(configuredFormat)) {
+ val pruningExpr = fetchQueryWithAttribute(expr,
Option.apply(indexDefinition.getSourceFields.get(0)),
RecordLevelIndexSupport.getSimpleLiteralGenerator(), attributeFetcher)._1.get._1
+ queryAndLiteralsOpt = Option.apply(Tuple2.apply(pruningExpr,
literals))
+ }
+ } else {
+ val pruningExpr = fetchQueryWithAttribute(expr,
Option.apply(indexDefinition.getSourceFields.get(0)),
RecordLevelIndexSupport.getSimpleLiteralGenerator(), attributeFetcher)._1.get._1
+ queryAndLiteralsOpt = Option.apply(Tuple2.apply(pruningExpr,
literals))
+ }
}
}
queryAndLiteralsOpt
}
+ private def loadExpressionIndexRecords(indexPartition: String,
+ prunedPartitions: Set[String],
+ shouldReadInMemory: Boolean):
HoodieData[HoodieMetadataColumnStats] = {
+ val indexDefinition =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(indexPartition)
+ val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadExpressionIndexForColumnsInternal(
+ indexDefinition.getSourceFields.asScala.toSeq, prunedPartitions,
indexPartition, shouldReadInMemory)
+ //TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
+ colStatsRecords
+ }
+
def loadExpressionIndexDataFrame(indexPartition: String,
prunedPartitions: Set[String],
shouldReadInMemory: Boolean): DataFrame = {
@@ -169,10 +442,10 @@ class ExpressionIndexSupport(spark: SparkSession,
val colStatsRecords: HoodieData[HoodieMetadataColumnStats] =
loadExpressionIndexForColumnsInternal(
indexDefinition.getSourceFields.asScala.toSeq, prunedPartitions,
indexPartition, shouldReadInMemory)
//TODO: [HUDI-8303] Explicit conversion might not be required for Scala
2.12+
- val catalystRows: HoodieData[InternalRow] =
colStatsRecords.mapPartitions(JFunction.toJavaSerializableFunction(it => {
+ val catalystRows: HoodieData[InternalRow] =
colStatsRecords.map[InternalRow](JFunction.toJavaSerializableFunction(r => {
val converter =
AvroConversionUtils.createAvroToInternalRowConverter(HoodieMetadataColumnStats.SCHEMA$,
columnStatsRecordStructType)
- it.asScala.map(r => converter(r).orNull).asJava
- }), false)
+ converter(r).orNull
+ }))
if (shouldReadInMemory) {
// NOTE: This will instantiate a [[Dataset]] backed by
[[LocalRelation]] holding all of the rows
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
index 225d806f92b..3d1aea3ffa3 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala
@@ -112,7 +112,7 @@ case class HoodieFileIndex(spark: SparkSession,
new RecordLevelIndexSupport(spark, metadataConfig, metaClient),
new BucketIndexSupport(spark, metadataConfig, metaClient),
new SecondaryIndexSupport(spark, metadataConfig, metaClient),
- new ExpressionIndexSupport(spark, metadataConfig, metaClient),
+ new ExpressionIndexSupport(spark, schema, metadataConfig, metaClient),
new BloomFiltersIndexSupport(spark, metadataConfig, metaClient),
new ColumnStatsIndexSupport(spark, schema, metadataConfig, metaClient)
)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
index 454014b9728..1cb20026876 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/RecordLevelIndexSupport.scala
@@ -229,6 +229,80 @@ object RecordLevelIndexSupport {
}
}
+ def fetchQueryWithAttribute(queryFilter: Expression, recordKeyOpt:
Option[String], literalGenerator: Function2[AttributeReference, Literal,
String],
+ attributeFetcher: Function1[Expression,
Expression]): (Option[(Expression, List[String])], Boolean) = {
+ queryFilter match {
+ case equalToQuery: EqualTo =>
+ val attributeLiteralTuple =
getAttributeLiteralTuple(attributeFetcher.apply(equalToQuery.left),
attributeFetcher.apply(equalToQuery.right)).orNull
+ if (attributeLiteralTuple != null) {
+ val attribute = attributeLiteralTuple._1
+ val literal = attributeLiteralTuple._2
+ if (attribute != null && attribute.name != null &&
attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
+ val recordKeyLiteral = literalGenerator.apply(attribute, literal)
+ (Option.apply(EqualTo(attribute, literal),
List.apply(recordKeyLiteral)), true)
+ } else {
+ (Option.empty, true)
+ }
+ } else {
+ (Option.empty, true)
+ }
+
+ case inQuery: In =>
+ var validINQuery = true
+ val attributeOpt = Option.apply(
+ attributeFetcher.apply(inQuery.value) match {
+ case attribute: AttributeReference =>
+ if (!attributeMatchesRecordKey(attribute.name, recordKeyOpt)) {
+ validINQuery = false
+ null
+ } else {
+ attribute
+ }
+ case _ =>
+ validINQuery = false
+ null
+ })
+ var literals: List[String] = List.empty
+ inQuery.list.foreach {
+ case literal: Literal if attributeOpt.isDefined =>
+ val recordKeyLiteral = literalGenerator.apply(attributeOpt.get,
literal)
+ literals = literals :+ recordKeyLiteral
+ case _ => validINQuery = false
+ }
+ if (validINQuery) {
+ (Option.apply(In(attributeOpt.get, inQuery.list), literals), true)
+ } else {
+ (Option.empty, true)
+ }
+
+ // Handle And expression (composite filter)
+ case andQuery: And =>
+ val leftResult = filterQueryWithRecordKey(andQuery.left, recordKeyOpt,
literalGenerator, attributeFetcher)
+ val rightResult = filterQueryWithRecordKey(andQuery.right,
recordKeyOpt, literalGenerator, attributeFetcher)
+
+ val isSupported = leftResult._2 && rightResult._2
+ if (!isSupported) {
+ (Option.empty, false)
+ } else {
+ // If both left and right filters are valid, concatenate their
results
+ (leftResult._1, rightResult._1) match {
+ case (Some((leftExp, leftKeys)), Some((rightExp, rightKeys))) =>
+ // Return concatenated expressions and record keys
+ (Option.apply(And(leftExp, rightExp), leftKeys ++ rightKeys),
true)
+ case (Some((leftExp, leftKeys)), None) =>
+ // Return concatenated expressions and record keys
+ (Option.apply(leftExp, leftKeys), true)
+ case (None, Some((rightExp, rightKeys))) =>
+ // Return concatenated expressions and record keys
+ (Option.apply(rightExp, rightKeys), true)
+ case _ => (Option.empty, true)
+ }
+ }
+
+ case _ => (Option.empty, false)
+ }
+ }
+
/**
* Returns the list of storage paths from the pruned partitions and file
slices.
*
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
index ad0b7bcd6b6..c698ad37ee8 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkBaseIndexSupport.scala
@@ -99,9 +99,9 @@ abstract class SparkBaseIndexSupport(spark: SparkSession,
(prunedPartitions, prunedFiles)
}
- protected def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression], prunedFileNames: Set[String]): Set[String] = {
+ protected def getCandidateFiles(indexDf: DataFrame, queryFilters:
Seq[Expression], prunedFileNames: Set[String], isExpressionIndex: Boolean =
false): Set[String] = {
val indexSchema = indexDf.schema
- val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_,
indexSchema)).reduce(And)
+ val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexSchema,
isExpressionIndex)).reduce(And)
val prunedCandidateFileNames =
indexDf.where(new Column(indexFilter))
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
index b9252eb831d..6e74c2f1e35 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
@@ -33,16 +33,18 @@ import org.apache.spark.unsafe.types.UTF8String
object DataSkippingUtils extends Logging {
/**
- * Translates provided {@link filterExpr} into corresponding
filter-expression for column-stats index index table
- * to filter out candidate files that would hold records matching the
original filter
+ * Translates provided {@link filterExpr} into corresponding
filter-expression for column-stats index table
+ * to filter out candidate files that would hold records matching the
original filter.
+ * In case the column stats were creating using expression index, the index
filter must also account for the expression.
*
* @param dataTableFilterExpr source table's query's filter expression
* @param indexSchema index table schema
+ * @param isExpressionIndex whether the index is an expression index
* @return filter for column-stats index's table
*/
- def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression,
indexSchema: StructType): Expression = {
+ def translateIntoColumnStatsIndexFilterExpr(dataTableFilterExpr: Expression,
indexSchema: StructType, isExpressionIndex: Boolean = false): Expression = {
try {
- createColumnStatsIndexFilterExprInternal(dataTableFilterExpr,
indexSchema)
+ createColumnStatsIndexFilterExprInternal(dataTableFilterExpr,
indexSchema, isExpressionIndex)
} catch {
case e: AnalysisException =>
logDebug(s"Failed to translated provided data table filter expr into
column stats one ($dataTableFilterExpr)", e)
@@ -50,10 +52,10 @@ object DataSkippingUtils extends Logging {
}
}
- private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr:
Expression, indexSchema: StructType): Expression = {
+ private def createColumnStatsIndexFilterExprInternal(dataTableFilterExpr:
Expression, indexSchema: StructType, isExpressionIndex: Boolean = false):
Expression = {
// Try to transform original Source Table's filter expression into
// Column-Stats Index filter expression
- tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema) match {
+ tryComposeIndexFilterExpr(dataTableFilterExpr, indexSchema,
isExpressionIndex) match {
case Some(e) => e
// NOTE: In case we can't transform source filter expression, we fallback
// to {@code TrueLiteral}, to essentially avoid pruning any indexed
files from scanning
@@ -61,7 +63,7 @@ object DataSkippingUtils extends Logging {
}
}
- private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression,
indexSchema: StructType): Option[Expression] = {
+ private def tryComposeIndexFilterExpr(sourceFilterExpr: Expression,
indexSchema: StructType, isExpressionIndex: Boolean = false):
Option[Expression] = {
//
// For translation of the Filter Expression for the Data Table into Filter
Expression for Column Stats Index, we're
// assuming that
@@ -93,7 +95,7 @@ object DataSkippingUtils extends Logging {
//
sourceFilterExpr match {
// If Expression is not resolved, we can't perform the analysis
accurately, bailing
- case expr if !expr.resolved => None
+ case expr if !expr.resolved && !isExpressionIndex => None
// Filter "expr(colA) = B" and "B = expr(colA)"
// Translates to "(expr(colA_minValue) <= B) AND (B <=
expr(colA_maxValue))" condition for index lookup
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
index e1604c98892..d4f230adc37 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestExpressionIndex.scala
@@ -19,12 +19,11 @@
package org.apache.spark.sql.hudi.command.index
-import org.apache.hudi.DataSourceWriteOptions.{INSERT_OPERATION_OPT_VAL,
OPERATION, PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
+import org.apache.hudi.DataSourceWriteOptions.{HIVE_PASS, HIVE_USER,
HIVE_USE_PRE_APACHE_INPUT_FORMAT, INSERT_OPERATION_OPT_VAL, OPERATION,
PARTITIONPATH_FIELD, PRECOMBINE_FIELD, RECORDKEY_FIELD, TABLE_TYPE}
import org.apache.hudi.HoodieConversionUtils.toProperties
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.client.utils.SparkMetadataWriterUtils
-import org.apache.hudi.{DataSourceReadOptions, ExpressionIndexSupport,
HoodieFileIndex, HoodieSparkUtils}
import org.apache.hudi.common.config.{HoodieMetadataConfig,
HoodieStorageConfig, TypedProperties}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.FileSlice
@@ -33,7 +32,6 @@ import org.apache.hudi.common.table.view.FileSystemViewManager
import org.apache.hudi.common.testutils.HoodieTestUtils
import org.apache.hudi.common.util.Option
import org.apache.hudi.config.{HoodieCleanConfig, HoodieCompactionConfig,
HoodieIndexConfig, HoodieWriteConfig}
-import org.apache.hudi.hive.HiveSyncConfigHolder._
import org.apache.hudi.hive.testutils.HiveTestUtil
import org.apache.hudi.hive.{HiveSyncTool, HoodieHiveSyncClient}
import org.apache.hudi.index.HoodieIndex
@@ -44,8 +42,10 @@ import org.apache.hudi.storage.StoragePath
import org.apache.hudi.sync.common.HoodieSyncConfig.{META_SYNC_BASE_PATH,
META_SYNC_DATABASE_NAME, META_SYNC_NO_PARTITION_METADATA, META_SYNC_TABLE_NAME}
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
import org.apache.hudi.util.JFunction
+import org.apache.hudi.{DataSourceReadOptions, ExpressionIndexSupport,
HoodieFileIndex, HoodieSparkUtils}
import org.apache.spark.api.java.JavaSparkContext
-import org.apache.spark.sql.{Column, SaveMode}
+import org.apache.spark.sql.Column.unapply
+import org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr
import org.apache.spark.sql.catalyst.analysis.{Analyzer, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, FromUnixTime, Literal, Upper}
@@ -53,7 +53,8 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.hudi.command.{CreateIndexCommand,
ShowIndexesCommand}
import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
-import org.apache.spark.sql.types.{BinaryType, ByteType, DateType,
DecimalType, IntegerType, ShortType, StringType, StructType, TimestampType}
+import org.apache.spark.sql.types.{BinaryType, ByteType, DateType,
DecimalType, DoubleType, IntegerType, LongType, ShortType, StringType,
StructField, StructType, TimestampType}
+import org.apache.spark.sql.{Column, SaveMode, functions}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.scalatest.Ignore
@@ -442,6 +443,10 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
| partitioned by(price)
| location '$basePath'
""".stripMargin)
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
spark.sql(s"insert into $tableName (id, name, ts, price) values(1,
'a1', 1000, 10)")
spark.sql(s"insert into $tableName (id, name, ts, price) values(2,
'a2', 200000, 100)")
spark.sql(s"insert into $tableName (id, name, ts, price) values(3,
'a3', 2000000000, 1000)")
@@ -758,6 +763,99 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
}
}
+ /**
+ * Test expression index with data skipping for unary expression and binary
expression.
+ */
+ @Test
+ def testColumnStatsPruningWithUnaryBinaryExpr(): Unit = {
+ if (HoodieSparkUtils.gteqSpark3_3) {
+ withTempDir { tmp =>
+ Seq("cow", "mor").foreach { tableType =>
+ val tableName = generateTableName +
s"_stats_pruning_binary_$tableType"
+ val basePath = s"${tmp.getCanonicalPath}/$tableName"
+
+ spark.sql(
+ s"""
+ CREATE TABLE $tableName (
+ | ts LONG,
+ | id STRING,
+ | rider STRING,
+ | driver STRING,
+ | fare DOUBLE,
+ | city STRING,
+ | state STRING
+ |) USING HUDI
+ |options(
+ | primaryKey ='id',
+ | type = '$tableType',
+ | hoodie.metadata.enable = 'true',
+ | hoodie.datasource.write.recordkey.field = 'id',
+ | hoodie.enable.data.skipping = 'true'
+ |)
+ |PARTITIONED BY (state)
+ |location '$basePath'
+ |""".stripMargin)
+
+ spark.sql("set hoodie.parquet.small.file.limit=0")
+ if (HoodieSparkUtils.gteqSpark3_4) {
+ spark.sql("set spark.sql.defaultColumn.enabled=false")
+ }
+
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city,
state) VALUES
+ |
(1695414527,'trip1','rider-A','driver-K',19.10,'san_francisco','california'),
+ |
(1695414531,'trip6','rider-C','driver-K',17.14,'san_diego','california'),
+ |
(1695332066,'trip3','rider-E','driver-O',93.50,'austin','texas'),
+ |
(1695516137,'trip4','rider-F','driver-P',34.15,'houston','texas')
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into $tableName(ts, id, rider, driver, fare, city,
state) VALUES
+ |
(1695414520,'trip2','rider-C','driver-M',27.70,'sunnyvale','california'),
+ |
(1699349649,'trip5','rider-A','driver-Q',3.32,'san_diego','texas')
+ |""".stripMargin)
+
+ // With unary expression
+ spark.sql(s"create index idx_rider on $tableName using
column_stats(rider) options(expr='lower')")
+ // With binary expression
+ spark.sql(s"create index idx_datestr on $tableName using
column_stats(ts) options(expr='from_unixtime', format='yyyy-MM-dd')")
+ // validate index created successfully
+ var metaClient = createMetaClient(spark, basePath)
+ assertTrue(metaClient.getIndexMetadata.isPresent)
+ val expressionIndexMetadata = metaClient.getIndexMetadata.get()
+ assertEquals("expr_index_idx_datestr",
expressionIndexMetadata.getIndexDefinitions.get("expr_index_idx_datestr").getIndexName)
+
+ val tableSchema: StructType =
+ StructType(
+ Seq(
+ StructField("ts", LongType),
+ StructField("id", StringType),
+ StructField("rider", StringType),
+ StructField("driver", StringType),
+ StructField("fare", DoubleType),
+ StructField("city", StringType),
+ StructField("state", StringType)
+ )
+ )
+ val opts = Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key
-> "true", HoodieMetadataConfig.ENABLE.key -> "true")
+ metaClient = createMetaClient(spark, basePath)
+
+ // validate skipping with both types of expression
+ val lowerExpr = resolveExpr(spark,
unapply(functions.lower(functions.col("rider"))).get, tableSchema)
+ var literal = Literal.create("rider-c")
+ var dataFilter = EqualTo(lowerExpr, literal)
+ verifyFilePruning(opts, dataFilter, metaClient,
isDataSkippingExpected = true)
+
+ val fromUnixTime = resolveExpr(spark,
unapply(functions.from_unixtime(functions.col("ts"), "yyyy-MM-dd")).get,
tableSchema)
+ literal = Literal.create("2023-11-07")
+ dataFilter = EqualTo(fromUnixTime, literal)
+ verifyFilePruning(opts, dataFilter, metaClient,
isDataSkippingExpected = true)
+ }
+ }
+ }
+ }
+
@Test
def testBloomFiltersIndexPruning(): Unit = {
if (HoodieSparkUtils.gteqSpark3_3) {
@@ -1054,7 +1152,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
val metadataConfig = HoodieMetadataConfig.newBuilder()
.fromProperties(toProperties(metadataOpts))
.build()
- val expressionIndexSupport = new ExpressionIndexSupport(spark,
metadataConfig, metaClient)
+ val expressionIndexSupport = new ExpressionIndexSupport(spark, null,
metadataConfig, metaClient)
val prunedPartitions = Set("9")
var indexDf =
expressionIndexSupport.loadExpressionIndexDataFrame("expr_index_idx_datestr",
prunedPartitions, shouldReadInMemory = true)
// check only one record returned corresponding to the pruned partition
@@ -1144,7 +1242,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase {
.build()
val fileIndex = new HoodieFileIndex(spark, metaClient, None,
opts ++ metadataOpts ++ Map("glob.paths" -> s"$basePath/9",
DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true"), includeLogFiles =
true)
- val expressionIndexSupport = new ExpressionIndexSupport(spark,
metadataConfig, metaClient)
+ val expressionIndexSupport = new ExpressionIndexSupport(spark, null,
metadataConfig, metaClient)
val partitionFilter: Expression = EqualTo(AttributeReference("c8",
IntegerType)(), Literal(9))
val (isPruned, prunedPaths) =
fileIndex.prunePartitionsAndGetFileSlices(Seq.empty, Seq(partitionFilter))
assertTrue(isPruned)