alexeykudinkin commented on a change in pull request #4060:
URL: https://github.com/apache/hudi/pull/4060#discussion_r754585272



##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -29,148 +30,186 @@ import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
 
-object DataSkippingUtils {
+object DataSkippingUtils extends Logging {
 
   /**
-    * create z_index filter and push those filters to index table to filter 
all candidate scan files.
-    * @param condition  origin filter from query.
-    * @param indexSchema schema from index table.
-    * @return filters for index table.
-    */
-  def createZindexFilter(condition: Expression, indexSchema: StructType): 
Expression = {
-    def buildExpressionInternal(colName: Seq[String], statisticValue: String): 
Expression = {
-      val appendColName = UnresolvedAttribute(colName).name + statisticValue
-      col(appendColName).expr
-    }
-
-    def reWriteCondition(colName: Seq[String], conditionExpress: Expression): 
Expression = {
-      val appendColName = UnresolvedAttribute(colName).name + "_minValue"
-      if (indexSchema.exists(p => p.name == appendColName)) {
-        conditionExpress
-      } else {
-        Literal.TrueLiteral
-      }
-    }
-
-    val minValue = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_minValue")
-    val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_maxValue")
-    val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_num_nulls")
-
-    condition match {
-      // query filter "colA = b"  convert it to "colA_minValue <= b and 
colA_maxValue >= b" for index table
+   * Translates provided {@link filterExpr} into corresponding 
filter-expression for Z-index index table
+   * to filter out candidate files that would hold records matching the 
original filter
+   *
+   * @param filterExpr  original filter from query
+   * @param indexSchema index table schema
+   * @return filter for Z-index table
+   */
+  def createZIndexLookupFilter(filterExpr: Expression, indexSchema: 
StructType): Expression = {
+
+    def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
+    def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
+    def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
+
+    def colContainsValuesEqualToLiteral(colName: String, value: Literal) =
+      // Only case when column C contains value V is when min(C) <= V <= max(c)
+      And(LessThanOrEqual(minValue(colName), value), 
GreaterThanOrEqual(maxValue(colName), value))
+
+    def colContainsValuesEqualToLiterals(colName: String, list: Seq[Literal]) =
+      // Only case when column C contains _any_ of the values V1, V2, etc is 
when either
+      //    min(C) <= V1 <= max(c) OR
+      //    min(C) <= V2 <= max(c) OR
+      //    ...
+      list.map { lit => colContainsValuesEqualToLiteral(colName, lit) 
}.reduce(Or)
+
+    filterExpr match {
+      // Filter "colA = b"
+      // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition 
for index lookup
       case EqualTo(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), 
value), GreaterThanOrEqual(maxValue(colName), value)))
-      // query filter "b = colA"  convert it to "colA_minValue <= b and 
colA_maxValue >= b" for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, value)
+      // Filter "b = colA"
+      // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition 
for index lookup
       case EqualTo(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), 
value), GreaterThanOrEqual(maxValue(colName), value)))
-      // query filter "colA = null"  convert it to "colA_num_nulls = null" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, value)
+      // Filter "colA = null"
+      // Translates to "colA_num_nulls = null" for index lookup
       case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ 
Literal(null, _)) =>
-        val colName = getTargetColNameParts(equalNullSafe.left)
-        reWriteCondition(colName, EqualTo(num_nulls(colName), 
equalNullSafe.right))
-      // query filter "colA < b"  convert it to "colA_minValue < b" for index 
table
+        val colName = getTargetColName(equalNullSafe.left, indexSchema)
+        EqualTo(numNulls(colName), equalNullSafe.right)
+      // Filter "colA < b"
+      // Translates to "colA_minValue < b" for index lookup
       case LessThan(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName,LessThan(minValue(colName), value))
-      // query filter "b < colA"  convert it to "colA_maxValue > b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThan(minValue(colName), value)
+      // Filter "b < colA"
+      // Translates to "b < colA_maxValue" for index lookup
       case LessThan(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(maxValue(colName), value))
-      // query filter "colA > b"  convert it to "colA_maxValue > b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(maxValue(colName), value)
+      // Filter "colA > b"
+      // Translates to "colA_maxValue > b" for index lookup
       case GreaterThan(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(maxValue(colName), value))
-      // query filter "b > colA"  convert it to "colA_minValue < b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(maxValue(colName), value)
+      // Filter "b > colA"
+      // Translates to "b > colA_minValue" for index lookup
       case GreaterThan(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThan(minValue(colName), value))
-      // query filter "colA <= b"  convert it to "colA_minValue <= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThan(minValue(colName), value)
+      // Filter "colA <= b"
+      // Translates to "colA_minValue <= b" for index lookup
       case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
-      // query filter "b <= colA"  convert it to "colA_maxValue >= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThanOrEqual(minValue(colName), value)
+      // Filter "b <= colA"
+      // Translates to "b <= colA_maxValue" for index lookup
       case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
-      // query filter "colA >= b"   convert it to "colA_maxValue >= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThanOrEqual(maxValue(colName), value)
+      // Filter "colA >= b"
+      // Translates to "colA_maxValue >= b" for index lookup
       case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
-        val colName = getTargetColNameParts(attribute)
+        val colName = getTargetColName(attribute, indexSchema)
         GreaterThanOrEqual(maxValue(colName), right)
-      // query filter "b >= colA"   convert it to "colA_minValue <= b" for 
index table
+      // Filter "b >= colA"
+      // Translates to "b >= colA_minValue" for index lookup
       case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
-      // query filter "colA is null"   convert it to "colA_num_nulls > 0" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThanOrEqual(minValue(colName), value)
+      // Filter "colA is null"
+      // Translates to "colA_num_nulls > 0" for index lookup
       case IsNull(attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
-      // query filter "colA is not null"   convert it to "colA_num_nulls = 0" 
for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(numNulls(colName), Literal(0))
+      // Filter "colA is not null"
+      // Translates to "colA_num_nulls = 0" for index lookup
       case IsNotNull(attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
-      // query filter "colA in (a,b)"   convert it to " (colA_minValue <= a 
and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        EqualTo(numNulls(colName), Literal(0))
+      // Filter "colA in (a, b, ...)"
+      // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR 
(colA_minValue <= b AND colA_maxValue >= b)" for index lookup
       case In(attribute: AttributeReference, list: Seq[Literal]) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, list.map { lit =>
-          And(LessThanOrEqual(minValue(colName), lit), 
GreaterThanOrEqual(maxValue(colName), lit))
-        }.reduce(Or))
-      // query filter "colA like xxx"   convert it to "  (colA_minValue <= xxx 
and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with 
xxx)  " for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiterals(colName, list)
+      // Filter "colA like xxx"
+      // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for 
index lookup
+      // NOTE: That this operator only matches string prefixes, and this is
+      //       essentially equivalent to "colA = b" expression
       case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName), 
v), GreaterThanOrEqual(maxValue(colName), v)) ,
-          Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName), 
v))))
-      // query filter "colA not in (a, b)"   convert it to " (not( 
colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and 
colA_maxValue = b)) " for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, v)
+      // Filter "colA not in (a, b, ...)"
+      // Translates to "(colA_minValue > a OR colA_maxValue < a) AND 
(colA_minValue > b OR colA_maxValue < b)" for index lookup
+      // NOTE: This is an inversion of `in (a, b, ...)` expr
       case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, list.map { lit =>
-          Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName), 
lit)))
-        }.reduce(And))
-      // query filter "colA != b"   convert it to "not ( colA_minValue = b and 
colA_maxValue = b )" for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        Not(colContainsValuesEqualToLiterals(colName, list))
+      // Filter "colA != b"
+      // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an 
inversion of expr for "colA = b") for index lookup
+      // NOTE: This is an inversion of `colA = b` expr

Review comment:
       Will address all negations

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -29,148 +30,186 @@ import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
 
-object DataSkippingUtils {
+object DataSkippingUtils extends Logging {
 
   /**
-    * create z_index filter and push those filters to index table to filter 
all candidate scan files.
-    * @param condition  origin filter from query.
-    * @param indexSchema schema from index table.
-    * @return filters for index table.
-    */
-  def createZindexFilter(condition: Expression, indexSchema: StructType): 
Expression = {
-    def buildExpressionInternal(colName: Seq[String], statisticValue: String): 
Expression = {
-      val appendColName = UnresolvedAttribute(colName).name + statisticValue
-      col(appendColName).expr
-    }
-
-    def reWriteCondition(colName: Seq[String], conditionExpress: Expression): 
Expression = {
-      val appendColName = UnresolvedAttribute(colName).name + "_minValue"
-      if (indexSchema.exists(p => p.name == appendColName)) {
-        conditionExpress
-      } else {
-        Literal.TrueLiteral
-      }
-    }
-
-    val minValue = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_minValue")
-    val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_maxValue")
-    val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_num_nulls")
-
-    condition match {
-      // query filter "colA = b"  convert it to "colA_minValue <= b and 
colA_maxValue >= b" for index table
+   * Translates provided {@link filterExpr} into corresponding 
filter-expression for Z-index index table
+   * to filter out candidate files that would hold records matching the 
original filter
+   *
+   * @param filterExpr  original filter from query
+   * @param indexSchema index table schema
+   * @return filter for Z-index table
+   */
+  def createZIndexLookupFilter(filterExpr: Expression, indexSchema: 
StructType): Expression = {
+
+    def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
+    def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
+    def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
+
+    def colContainsValuesEqualToLiteral(colName: String, value: Literal) =
+      // Only case when column C contains value V is when min(C) <= V <= max(c)
+      And(LessThanOrEqual(minValue(colName), value), 
GreaterThanOrEqual(maxValue(colName), value))
+
+    def colContainsValuesEqualToLiterals(colName: String, list: Seq[Literal]) =
+      // Only case when column C contains _any_ of the values V1, V2, etc is 
when either
+      //    min(C) <= V1 <= max(c) OR
+      //    min(C) <= V2 <= max(c) OR
+      //    ...
+      list.map { lit => colContainsValuesEqualToLiteral(colName, lit) 
}.reduce(Or)
+
+    filterExpr match {
+      // Filter "colA = b"
+      // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition 
for index lookup
       case EqualTo(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), 
value), GreaterThanOrEqual(maxValue(colName), value)))
-      // query filter "b = colA"  convert it to "colA_minValue <= b and 
colA_maxValue >= b" for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, value)
+      // Filter "b = colA"
+      // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition 
for index lookup
       case EqualTo(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), 
value), GreaterThanOrEqual(maxValue(colName), value)))
-      // query filter "colA = null"  convert it to "colA_num_nulls = null" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, value)
+      // Filter "colA = null"
+      // Translates to "colA_num_nulls = null" for index lookup
       case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ 
Literal(null, _)) =>
-        val colName = getTargetColNameParts(equalNullSafe.left)
-        reWriteCondition(colName, EqualTo(num_nulls(colName), 
equalNullSafe.right))
-      // query filter "colA < b"  convert it to "colA_minValue < b" for index 
table
+        val colName = getTargetColName(equalNullSafe.left, indexSchema)
+        EqualTo(numNulls(colName), equalNullSafe.right)
+      // Filter "colA < b"
+      // Translates to "colA_minValue < b" for index lookup
       case LessThan(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName,LessThan(minValue(colName), value))
-      // query filter "b < colA"  convert it to "colA_maxValue > b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThan(minValue(colName), value)
+      // Filter "b < colA"
+      // Translates to "b < colA_maxValue" for index lookup
       case LessThan(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(maxValue(colName), value))
-      // query filter "colA > b"  convert it to "colA_maxValue > b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(maxValue(colName), value)
+      // Filter "colA > b"
+      // Translates to "colA_maxValue > b" for index lookup
       case GreaterThan(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(maxValue(colName), value))
-      // query filter "b > colA"  convert it to "colA_minValue < b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(maxValue(colName), value)
+      // Filter "b > colA"
+      // Translates to "b > colA_minValue" for index lookup
       case GreaterThan(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThan(minValue(colName), value))
-      // query filter "colA <= b"  convert it to "colA_minValue <= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThan(minValue(colName), value)
+      // Filter "colA <= b"
+      // Translates to "colA_minValue <= b" for index lookup
       case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
-      // query filter "b <= colA"  convert it to "colA_maxValue >= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThanOrEqual(minValue(colName), value)
+      // Filter "b <= colA"
+      // Translates to "b <= colA_maxValue" for index lookup
       case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
-      // query filter "colA >= b"   convert it to "colA_maxValue >= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThanOrEqual(maxValue(colName), value)
+      // Filter "colA >= b"
+      // Translates to "colA_maxValue >= b" for index lookup
       case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
-        val colName = getTargetColNameParts(attribute)
+        val colName = getTargetColName(attribute, indexSchema)
         GreaterThanOrEqual(maxValue(colName), right)
-      // query filter "b >= colA"   convert it to "colA_minValue <= b" for 
index table
+      // Filter "b >= colA"
+      // Translates to "b >= colA_minValue" for index lookup
       case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
-      // query filter "colA is null"   convert it to "colA_num_nulls > 0" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThanOrEqual(minValue(colName), value)
+      // Filter "colA is null"
+      // Translates to "colA_num_nulls > 0" for index lookup
       case IsNull(attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
-      // query filter "colA is not null"   convert it to "colA_num_nulls = 0" 
for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(numNulls(colName), Literal(0))
+      // Filter "colA is not null"
+      // Translates to "colA_num_nulls = 0" for index lookup
       case IsNotNull(attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
-      // query filter "colA in (a,b)"   convert it to " (colA_minValue <= a 
and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        EqualTo(numNulls(colName), Literal(0))
+      // Filter "colA in (a, b, ...)"
+      // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR 
(colA_minValue <= b AND colA_maxValue >= b)" for index lookup
       case In(attribute: AttributeReference, list: Seq[Literal]) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, list.map { lit =>
-          And(LessThanOrEqual(minValue(colName), lit), 
GreaterThanOrEqual(maxValue(colName), lit))
-        }.reduce(Or))
-      // query filter "colA like xxx"   convert it to "  (colA_minValue <= xxx 
and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with 
xxx)  " for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiterals(colName, list)
+      // Filter "colA like xxx"
+      // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for 
index lookup
+      // NOTE: That this operator only matches string prefixes, and this is
+      //       essentially equivalent to "colA = b" expression
       case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName), 
v), GreaterThanOrEqual(maxValue(colName), v)) ,
-          Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName), 
v))))
-      // query filter "colA not in (a, b)"   convert it to " (not( 
colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and 
colA_maxValue = b)) " for index table
+        val colName = getTargetColName(attribute, indexSchema)

Review comment:
       Removed the second leg b/c it's included in the first one -- if column A 
contains string S that starts w/ prefix P, that entails that `min(A) <= P <= 
max(A)`

##########
File path: 
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/DataSkippingUtils.scala
##########
@@ -29,148 +30,186 @@ import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.{StringType, StructType}
 import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.unsafe.types.UTF8String
 
 import scala.collection.JavaConverters._
 
-object DataSkippingUtils {
+object DataSkippingUtils extends Logging {
 
   /**
-    * create z_index filter and push those filters to index table to filter 
all candidate scan files.
-    * @param condition  origin filter from query.
-    * @param indexSchema schema from index table.
-    * @return filters for index table.
-    */
-  def createZindexFilter(condition: Expression, indexSchema: StructType): 
Expression = {
-    def buildExpressionInternal(colName: Seq[String], statisticValue: String): 
Expression = {
-      val appendColName = UnresolvedAttribute(colName).name + statisticValue
-      col(appendColName).expr
-    }
-
-    def reWriteCondition(colName: Seq[String], conditionExpress: Expression): 
Expression = {
-      val appendColName = UnresolvedAttribute(colName).name + "_minValue"
-      if (indexSchema.exists(p => p.name == appendColName)) {
-        conditionExpress
-      } else {
-        Literal.TrueLiteral
-      }
-    }
-
-    val minValue = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_minValue")
-    val maxValue = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_maxValue")
-    val num_nulls = (colName: Seq[String]) => buildExpressionInternal(colName, 
"_num_nulls")
-
-    condition match {
-      // query filter "colA = b"  convert it to "colA_minValue <= b and 
colA_maxValue >= b" for index table
+   * Translates provided {@link filterExpr} into corresponding 
filter-expression for Z-index index table
+   * to filter out candidate files that would hold records matching the 
original filter
+   *
+   * @param filterExpr  original filter from query
+   * @param indexSchema index table schema
+   * @return filter for Z-index table
+   */
+  def createZIndexLookupFilter(filterExpr: Expression, indexSchema: 
StructType): Expression = {
+
+    def minValue(colName: String) = col(getMinColumnNameFor(colName)).expr
+    def maxValue(colName: String) = col(getMaxColumnNameFor(colName)).expr
+    def numNulls(colName: String) = col(getNumNullsColumnNameFor(colName)).expr
+
+    def colContainsValuesEqualToLiteral(colName: String, value: Literal) =
+      // Only case when column C contains value V is when min(C) <= V <= max(c)
+      And(LessThanOrEqual(minValue(colName), value), 
GreaterThanOrEqual(maxValue(colName), value))
+
+    def colContainsValuesEqualToLiterals(colName: String, list: Seq[Literal]) =
+      // Only case when column C contains _any_ of the values V1, V2, etc is 
when either
+      //    min(C) <= V1 <= max(c) OR
+      //    min(C) <= V2 <= max(c) OR
+      //    ...
+      list.map { lit => colContainsValuesEqualToLiteral(colName, lit) 
}.reduce(Or)
+
+    filterExpr match {
+      // Filter "colA = b"
+      // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition 
for index lookup
       case EqualTo(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), 
value), GreaterThanOrEqual(maxValue(colName), value)))
-      // query filter "b = colA"  convert it to "colA_minValue <= b and 
colA_maxValue >= b" for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, value)
+      // Filter "b = colA"
+      // Translates to "colA_minValue <= b AND colA_maxValue >= b" condition 
for index lookup
       case EqualTo(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, And(LessThanOrEqual(minValue(colName), 
value), GreaterThanOrEqual(maxValue(colName), value)))
-      // query filter "colA = null"  convert it to "colA_num_nulls = null" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, value)
+      // Filter "colA = null"
+      // Translates to "colA_num_nulls = null" for index lookup
       case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ 
Literal(null, _)) =>
-        val colName = getTargetColNameParts(equalNullSafe.left)
-        reWriteCondition(colName, EqualTo(num_nulls(colName), 
equalNullSafe.right))
-      // query filter "colA < b"  convert it to "colA_minValue < b" for index 
table
+        val colName = getTargetColName(equalNullSafe.left, indexSchema)
+        EqualTo(numNulls(colName), equalNullSafe.right)
+      // Filter "colA < b"
+      // Translates to "colA_minValue < b" for index lookup
       case LessThan(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName,LessThan(minValue(colName), value))
-      // query filter "b < colA"  convert it to "colA_maxValue > b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThan(minValue(colName), value)
+      // Filter "b < colA"
+      // Translates to "b < colA_maxValue" for index lookup
       case LessThan(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(maxValue(colName), value))
-      // query filter "colA > b"  convert it to "colA_maxValue > b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(maxValue(colName), value)
+      // Filter "colA > b"
+      // Translates to "colA_maxValue > b" for index lookup
       case GreaterThan(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(maxValue(colName), value))
-      // query filter "b > colA"  convert it to "colA_minValue < b" for index 
table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(maxValue(colName), value)
+      // Filter "b > colA"
+      // Translates to "b > colA_minValue" for index lookup
       case GreaterThan(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThan(minValue(colName), value))
-      // query filter "colA <= b"  convert it to "colA_minValue <= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThan(minValue(colName), value)
+      // Filter "colA <= b"
+      // Translates to "colA_minValue <= b" for index lookup
       case LessThanOrEqual(attribute: AttributeReference, value: Literal) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
-      // query filter "b <= colA"  convert it to "colA_maxValue >= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThanOrEqual(minValue(colName), value)
+      // Filter "b <= colA"
+      // Translates to "b <= colA_maxValue" for index lookup
       case LessThanOrEqual(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThanOrEqual(maxValue(colName), value))
-      // query filter "colA >= b"   convert it to "colA_maxValue >= b" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThanOrEqual(maxValue(colName), value)
+      // Filter "colA >= b"
+      // Translates to "colA_maxValue >= b" for index lookup
       case GreaterThanOrEqual(attribute: AttributeReference, right: Literal) =>
-        val colName = getTargetColNameParts(attribute)
+        val colName = getTargetColName(attribute, indexSchema)
         GreaterThanOrEqual(maxValue(colName), right)
-      // query filter "b >= colA"   convert it to "colA_minValue <= b" for 
index table
+      // Filter "b >= colA"
+      // Translates to "b >= colA_minValue" for index lookup
       case GreaterThanOrEqual(value: Literal, attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, LessThanOrEqual(minValue(colName), value))
-      // query filter "colA is null"   convert it to "colA_num_nulls > 0" for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        LessThanOrEqual(minValue(colName), value)
+      // Filter "colA is null"
+      // Translates to "colA_num_nulls > 0" for index lookup
       case IsNull(attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, GreaterThan(num_nulls(colName), Literal(0)))
-      // query filter "colA is not null"   convert it to "colA_num_nulls = 0" 
for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        GreaterThan(numNulls(colName), Literal(0))
+      // Filter "colA is not null"
+      // Translates to "colA_num_nulls = 0" for index lookup
       case IsNotNull(attribute: AttributeReference) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, EqualTo(num_nulls(colName), Literal(0)))
-      // query filter "colA in (a,b)"   convert it to " (colA_minValue <= a 
and colA_maxValue >= a) or (colA_minValue <= b and colA_maxValue >= b) " for 
index table
+        val colName = getTargetColName(attribute, indexSchema)
+        EqualTo(numNulls(colName), Literal(0))
+      // Filter "colA in (a, b, ...)"
+      // Translates to "(colA_minValue <= a AND colA_maxValue >= a) OR 
(colA_minValue <= b AND colA_maxValue >= b)" for index lookup
       case In(attribute: AttributeReference, list: Seq[Literal]) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, list.map { lit =>
-          And(LessThanOrEqual(minValue(colName), lit), 
GreaterThanOrEqual(maxValue(colName), lit))
-        }.reduce(Or))
-      // query filter "colA like xxx"   convert it to "  (colA_minValue <= xxx 
and colA_maxValue >= xxx) or (colA_min start with xxx or colA_max start with 
xxx)  " for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiterals(colName, list)
+      // Filter "colA like xxx"
+      // Translates to "colA_minValue <= xxx AND colA_maxValue >= xxx" for 
index lookup
+      // NOTE: That this operator only matches string prefixes, and this is
+      //       essentially equivalent to "colA = b" expression
       case StartsWith(attribute, v @ Literal(_: UTF8String, _)) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, Or(And(LessThanOrEqual(minValue(colName), 
v), GreaterThanOrEqual(maxValue(colName), v)) ,
-          Or(StartsWith(minValue(colName), v), StartsWith(maxValue(colName), 
v))))
-      // query filter "colA not in (a, b)"   convert it to " (not( 
colA_minValue = a and colA_maxValue = a)) and (not( colA_minValue = b and 
colA_maxValue = b)) " for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        colContainsValuesEqualToLiteral(colName, v)
+      // Filter "colA not in (a, b, ...)"
+      // Translates to "(colA_minValue > a OR colA_maxValue < a) AND 
(colA_minValue > b OR colA_maxValue < b)" for index lookup
+      // NOTE: This is an inversion of `in (a, b, ...)` expr
       case Not(In(attribute: AttributeReference, list: Seq[Literal])) =>
-        val colName = getTargetColNameParts(attribute)
-        reWriteCondition(colName, list.map { lit =>
-          Not(And(EqualTo(minValue(colName), lit), EqualTo(maxValue(colName), 
lit)))
-        }.reduce(And))
-      // query filter "colA != b"   convert it to "not ( colA_minValue = b and 
colA_maxValue = b )" for index table
+        val colName = getTargetColName(attribute, indexSchema)
+        Not(colContainsValuesEqualToLiterals(colName, list))
+      // Filter "colA != b"
+      // Translates to "colA_minValue > b OR colA_maxValue < b" (which is an 
inversion of expr for "colA = b") for index lookup
+      // NOTE: This is an inversion of `colA = b` expr

Review comment:
       Great catch! 
   You're right -- we can't simply invert b/c we're looking only at "contained" 
metrics which we can't invert




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to