This is an automated email from the ASF dual-hosted git repository. huaxingao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2e2b1ae1021 [SPARK-39784][SQL] Put Literal values on the right side of the data source filter after translating Catalyst Expression to data source filter 2e2b1ae1021 is described below commit 2e2b1ae1021bc4bc99f9749e05e4770be3aec43f Author: huaxingao <huaxin_...@apple.com> AuthorDate: Fri Jul 22 13:49:00 2022 -0700 [SPARK-39784][SQL] Put Literal values on the right side of the data source filter after translating Catalyst Expression to data source filter ### What changes were proposed in this pull request? Even though the literal value could be on both sides of the filter, e.g. both `a > 1` and `1 < a` are valid, after translating Catalyst Expression to data source filter, we want the literal value on the right side so it's easier for the data source to handle these filters. We do this kind of normalization for V1 Filter. We should have the same behavior for V2 Filter. Before this PR, for the filters that have literal values on the right side, e.g. `1 > a`, we keep it as is. After this PR, we will normalize it to `a < 1` so the data source doesn't need to check each of the filters (and do the flip). ### Why are the changes needed? I think we should follow V1 Filter's behavior, normalize the filters during catalyst Expression to DS Filter translation time to make the literal values on the right side, so later on, data source doesn't need to check every single filter to figure out if it needs to flip the sides. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new test Closes #37197 from huaxingao/flip. Authored-by: huaxingao <huaxin_...@apple.com> Signed-off-by: huaxingao <huaxin_...@apple.com> --- .../sql/catalyst/util/V2ExpressionBuilder.scala | 21 +++++++ .../datasources/v2/DataSourceV2StrategySuite.scala | 67 +++++++++++++++++++++- 2 files changed, 86 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala index 8bb65a88044..59cbcf48334 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala @@ -233,6 +233,10 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { val r = generateExpression(b.right) if (l.isDefined && r.isDefined) { b match { + case _: Predicate if isBinaryComparisonOperator(b.sqlOperator) && + l.get.isInstanceOf[LiteralValue[_]] && r.get.isInstanceOf[FieldReference] => + Some(new V2Predicate(flipComparisonOperatorName(b.sqlOperator), + Array[V2Expression](r.get, l.get))) case _: Predicate => Some(new V2Predicate(b.sqlOperator, Array[V2Expression](l.get, r.get))) case _ => @@ -408,6 +412,23 @@ class V2ExpressionBuilder(e: Expression, isPredicate: Boolean = false) { } case _ => None } + + private def isBinaryComparisonOperator(operatorName: String): Boolean = { + operatorName match { + case ">" | "<" | ">=" | "<=" | "=" | "<=>" => true + case _ => false + } + } + + private def flipComparisonOperatorName(operatorName: String): String = { + operatorName match { + case ">" => "<" + case "<" => ">" + case ">=" => "<=" + case "<=" => ">=" + case _ => operatorName + } + } } object ColumnOrField { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala index 66dc65cf681..c3f51bed269 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StrategySuite.scala @@ -18,14 +18,77 @@ package org.apache.spark.sql.execution.datasources.v2 import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue} import org.apache.spark.sql.connector.expressions.filter.Predicate import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.BooleanType +import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, StructField, StructType} class DataSourceV2StrategySuite extends PlanTest with SharedSparkSession { + val attrInts = Seq( + $"cint".int, + $"c.int".int, + GetStructField($"a".struct(StructType( + StructField("cstr", StringType, nullable = true) :: + StructField("cint", IntegerType, nullable = true) :: Nil)), 1, None), + GetStructField($"a".struct(StructType( + StructField("c.int", IntegerType, nullable = true) :: + StructField("cstr", StringType, nullable = true) :: Nil)), 0, None), + GetStructField($"a.b".struct(StructType( + StructField("cstr1", StringType, nullable = true) :: + StructField("cstr2", StringType, nullable = true) :: + StructField("cint", IntegerType, nullable = true) :: Nil)), 2, None), + GetStructField($"a.b".struct(StructType( + StructField("c.int", IntegerType, nullable = true) :: Nil)), 0, None), + GetStructField(GetStructField($"a".struct(StructType( + StructField("cstr1", StringType, nullable = true) :: + StructField("b", StructType(StructField("cint", IntegerType, nullable = true) :: + StructField("cstr2", StringType, nullable = true) :: Nil)) :: Nil)), 1, None), 0, None) + ).zip(Seq( + "cint", + "`c.int`", // single level field that contains `dot` in name + "a.cint", // two level nested field + "a.`c.int`", // two level nested field, and nested level contains `dot` + "`a.b`.cint", // two level nested field, and top level contains `dot` + "`a.b`.`c.int`", // two level nested field, and both levels contain `dot` + "a.b.cint" // three level nested field + )) + + test("SPARK-39784: translate binary expression") { attrInts + .foreach { case (attrInt, intColName) => + testTranslateFilter(EqualTo(attrInt, 1), + Some(new Predicate("=", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + testTranslateFilter(EqualTo(1, attrInt), + Some(new Predicate("=", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + + testTranslateFilter(EqualNullSafe(attrInt, 1), + Some(new Predicate("<=>", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + testTranslateFilter(EqualNullSafe(1, attrInt), + Some(new Predicate("<=>", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + + testTranslateFilter(GreaterThan(attrInt, 1), + Some(new Predicate(">", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + testTranslateFilter(GreaterThan(1, attrInt), + Some(new Predicate("<", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + + testTranslateFilter(LessThan(attrInt, 1), + Some(new Predicate("<", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + testTranslateFilter(LessThan(1, attrInt), + Some(new Predicate(">", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + + testTranslateFilter(GreaterThanOrEqual(attrInt, 1), + Some(new Predicate(">=", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + testTranslateFilter(GreaterThanOrEqual(1, attrInt), + Some(new Predicate("<=", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + + testTranslateFilter(LessThanOrEqual(attrInt, 1), + Some(new Predicate("<=", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + testTranslateFilter(LessThanOrEqual(1, attrInt), + Some(new Predicate(">=", Array(FieldReference(intColName), LiteralValue(1, IntegerType))))) + } + } + test("SPARK-36644: Push down boolean column filter") { testTranslateFilter($"col".boolean, Some(new Predicate("=", Array(FieldReference("col"), LiteralValue(true, BooleanType))))) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org