This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 5ecf306 [SPARK-34833][SQL] Apply right-padding correctly for correlated subqueries 5ecf306 is described below commit 5ecf306245d17053e25b68c844828878a66b593a Author: Takeshi Yamamuro <yamam...@apache.org> AuthorDate: Thu Mar 25 08:31:57 2021 +0900 [SPARK-34833][SQL] Apply right-padding correctly for correlated subqueries ### What changes were proposed in this pull request? This PR intends to fix the bug that does not apply right-padding for char types inside correlated subquries. For example, a query below returns nothing in master, but a correct result is `c`. ``` scala> sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING parquet") scala> sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(7)) USING parquet") scala> sql("INSERT INTO t1 VALUES ('c', 'b')") scala> sql("INSERT INTO t2 VALUES ('a', 'b')") scala> val df = sql(""" |SELECT v FROM t1 |WHERE 'a' IN (SELECT v FROM t2 WHERE t2.c = t1.c )""".stripMargin) scala> df.show() +---+ | v| +---+ +---+ ``` This is because `ApplyCharTypePadding` does not handle the case above to apply right-padding into `'abc'`. This PR modifies the code in `ApplyCharTypePadding` for handling it correctly. ``` // Before this PR: scala> df.explain(true) == Analyzed Logical Plan == v: string Project [v#13] +- Filter a IN (list#12 [c#14]) : +- Project [v#15] : +- Filter (c#16 = outer(c#14)) : +- SubqueryAlias spark_catalog.default.t2 : +- Relation default.t2[v#15,c#16] parquet +- SubqueryAlias spark_catalog.default.t1 +- Relation default.t1[v#13,c#14] parquet scala> df.show() +---+ | v| +---+ +---+ // After this PR: scala> df.explain(true) == Analyzed Logical Plan == v: string Project [v#43] +- Filter a IN (list#42 [c#44]) : +- Project [v#45] : +- Filter (c#46 = rpad(outer(c#44), 7, )) : +- SubqueryAlias spark_catalog.default.t2 : +- Relation default.t2[v#45,c#46] parquet +- SubqueryAlias spark_catalog.default.t1 +- Relation default.t1[v#43,c#44] parquet scala> df.show() +---+ | v| +---+ | c| +---+ ``` This fix is lated to TPCDS q17; the query returns nothing because of this bug: https://github.com/apache/spark/pull/31886/files#r599333799 ### Why are the changes needed? Bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit tests added. Closes #31940 from maropu/FixCharPadding. Authored-by: Takeshi Yamamuro <yamam...@apache.org> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> (cherry picked from commit 150769bcedb6e4a97596e0f04d686482cd09e92a) Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 45 ++++++++++++++--- .../apache/spark/sql/CharVarcharTestSuite.scala | 57 ++++++++++++++++------ 2 files changed, 79 insertions(+), 23 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f4cdeab..d490845 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3921,16 +3921,28 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUp { - case operator if operator.resolved => operator.transformExpressionsUp { + case operator => operator.transformExpressionsUp { + case e if !e.childrenResolved => e + // String literal is treated as char type when it's compared to a char type column. // We should pad the shorter one to the longer length. case b @ BinaryComparison(attr: Attribute, lit) if lit.foldable => - padAttrLitCmp(attr, lit).map { newChildren => + padAttrLitCmp(attr, attr.metadata, lit).map { newChildren => b.withNewChildren(newChildren) }.getOrElse(b) case b @ BinaryComparison(lit, attr: Attribute) if lit.foldable => - padAttrLitCmp(attr, lit).map { newChildren => + padAttrLitCmp(attr, attr.metadata, lit).map { newChildren => + b.withNewChildren(newChildren.reverse) + }.getOrElse(b) + + case b @ BinaryComparison(or @ OuterReference(attr: Attribute), lit) if lit.foldable => + padAttrLitCmp(or, attr.metadata, lit).map { newChildren => + b.withNewChildren(newChildren) + }.getOrElse(b) + + case b @ BinaryComparison(lit, or @ OuterReference(attr: Attribute)) if lit.foldable => + padAttrLitCmp(or, attr.metadata, lit).map { newChildren => b.withNewChildren(newChildren.reverse) }.getOrElse(b) @@ -3954,6 +3966,12 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { case b @ BinaryComparison(left: Attribute, right: Attribute) => b.withNewChildren(CharVarcharUtils.addPaddingInStringComparison(Seq(left, right))) + case b @ BinaryComparison(OuterReference(left: Attribute), right: Attribute) => + b.withNewChildren(padOuterRefAttrCmp(left, right)) + + case b @ BinaryComparison(left: Attribute, OuterReference(right: Attribute)) => + b.withNewChildren(padOuterRefAttrCmp(right, left).reverse) + case i @ In(attr: Attribute, list) if list.forall(_.isInstanceOf[Attribute]) => val newChildren = CharVarcharUtils.addPaddingInStringComparison( attr +: list.map(_.asInstanceOf[Attribute])) @@ -3962,9 +3980,12 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { } } - private def padAttrLitCmp(attr: Attribute, lit: Expression): Option[Seq[Expression]] = { - if (attr.dataType == StringType) { - CharVarcharUtils.getRawType(attr.metadata).flatMap { + private def padAttrLitCmp( + expr: Expression, + metadata: Metadata, + lit: Expression): Option[Seq[Expression]] = { + if (expr.dataType == StringType) { + CharVarcharUtils.getRawType(metadata).flatMap { case CharType(length) => val str = lit.eval().asInstanceOf[UTF8String] if (str == null) { @@ -3972,9 +3993,9 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { } else { val stringLitLen = str.numChars() if (length < stringLitLen) { - Some(Seq(StringRPad(attr, Literal(stringLitLen)), lit)) + Some(Seq(StringRPad(expr, Literal(stringLitLen)), lit)) } else if (length > stringLitLen) { - Some(Seq(attr, StringRPad(lit, Literal(length)))) + Some(Seq(expr, StringRPad(lit, Literal(length)))) } else { None } @@ -3986,6 +4007,14 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] { } } + private def padOuterRefAttrCmp(outerAttr: Attribute, attr: Attribute): Seq[Expression] = { + val Seq(r, newAttr) = CharVarcharUtils.addPaddingInStringComparison(Seq(outerAttr, attr)) + val newOuterRef = r.transform { + case ar: Attribute if ar.semanticEquals(outerAttr) => OuterReference(ar) + } + Seq(newOuterRef, newAttr) + } + private def addPadding(expr: Expression, charLength: Int, targetLength: Int): Expression = { if (targetLength > charLength) StringRPad(expr, Literal(targetLength)) else expr } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala index 1775172..76f7f42 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala @@ -584,21 +584,6 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { } } - test("SPARK-33992: char/varchar resolution in correlated sub query") { - withTable("t1", "t2") { - sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING $format") - sql(s"CREATE TABLE t2(v VARCHAR(3), c CHAR(5)) USING $format") - sql("INSERT INTO t1 VALUES ('c', 'b')") - sql("INSERT INTO t2 VALUES ('a', 'b')") - - checkAnswer(sql( - """ - |SELECT v FROM t1 - |WHERE 'a' IN (SELECT v FROM t2 WHERE t1.c = t2.c )""".stripMargin), - Row("c")) - } - } - test("SPARK-34003: fix char/varchar fails w/ both group by and order by ") { withTable("t") { sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format") @@ -633,6 +618,48 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils { checkAnswer(spark.table("t"), Row("c ")) } } + + test("SPARK-34833: right-padding applied correctly for correlated subqueries - join keys") { + withTable("t1", "t2") { + sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING $format") + sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(8)) USING $format") + sql("INSERT INTO t1 VALUES ('c', 'b')") + sql("INSERT INTO t2 VALUES ('a', 'b')") + Seq("t1.c = t2.c", "t2.c = t1.c", + "t1.c = 'b'", "'b' = t1.c", "t1.c = 'b '", "'b ' = t1.c", + "t1.c = 'b '", "'b ' = t1.c").foreach { predicate => + checkAnswer(sql( + s""" + |SELECT v FROM t1 + |WHERE 'a' IN (SELECT v FROM t2 WHERE $predicate) + """.stripMargin), + Row("c")) + } + } + } + + test("SPARK-34833: right-padding applied correctly for correlated subqueries - other preds") { + withTable("t") { + sql(s"CREATE TABLE t(c0 INT, c1 CHAR(5), c2 CHAR(7)) USING $format") + sql("INSERT INTO t VALUES (1, 'abc', 'abc')") + Seq("c1 = 'abc'", "'abc' = c1", "c1 = 'abc '", "'abc ' = c1", + "c1 = 'abc '", "'abc ' = c1", "c1 = c2", "c2 = c1", + "c1 IN ('xxx', 'abc', 'xxxxx')", "c1 IN ('xxx', 'abc ', 'xxxxx')", + "c1 IN ('xxx', 'abc ', 'xxxxx')", + "c1 IN (c2)", "c2 IN (c1)").foreach { predicate => + checkAnswer(sql( + s""" + |SELECT c0 FROM t t1 + |WHERE ( + | SELECT count(*) AS c + | FROM t + | WHERE c0 = t1.c0 AND $predicate + |) > 0 + """.stripMargin), + Row(1)) + } + } + } } // Some basic char/varchar tests which doesn't rely on table implementation. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org