This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 5ecf306  [SPARK-34833][SQL] Apply right-padding correctly for 
correlated subqueries
5ecf306 is described below

commit 5ecf306245d17053e25b68c844828878a66b593a
Author: Takeshi Yamamuro <yamam...@apache.org>
AuthorDate: Thu Mar 25 08:31:57 2021 +0900

    [SPARK-34833][SQL] Apply right-padding correctly for correlated subqueries
    
    ### What changes were proposed in this pull request?
    
    This PR intends to fix the bug that does not apply right-padding for char 
types inside correlated subquries.
    For example,  a query below returns nothing in master, but a correct result 
is `c`.
    ```
    scala> sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING parquet")
    scala> sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(7)) USING parquet")
    scala> sql("INSERT INTO t1 VALUES ('c', 'b')")
    scala> sql("INSERT INTO t2 VALUES ('a', 'b')")
    scala> val df = sql("""
      |SELECT v FROM t1
      |WHERE 'a' IN (SELECT v FROM t2 WHERE t2.c = t1.c )""".stripMargin)
    
    scala> df.show()
    +---+
    |  v|
    +---+
    +---+
    
    ```
    
    This is because `ApplyCharTypePadding`  does not handle the case above to 
apply right-padding into `'abc'`. This PR modifies the code in 
`ApplyCharTypePadding` for handling it correctly.
    
    ```
    // Before this PR:
    scala> df.explain(true)
    == Analyzed Logical Plan ==
    v: string
    Project [v#13]
    +- Filter a IN (list#12 [c#14])
       :  +- Project [v#15]
       :     +- Filter (c#16 = outer(c#14))
       :        +- SubqueryAlias spark_catalog.default.t2
       :           +- Relation default.t2[v#15,c#16] parquet
       +- SubqueryAlias spark_catalog.default.t1
          +- Relation default.t1[v#13,c#14] parquet
    
    scala> df.show()
    +---+
    |  v|
    +---+
    +---+
    
    // After this PR:
    scala> df.explain(true)
    == Analyzed Logical Plan ==
    v: string
    Project [v#43]
    +- Filter a IN (list#42 [c#44])
       :  +- Project [v#45]
       :     +- Filter (c#46 = rpad(outer(c#44), 7,  ))
       :        +- SubqueryAlias spark_catalog.default.t2
       :           +- Relation default.t2[v#45,c#46] parquet
       +- SubqueryAlias spark_catalog.default.t1
          +- Relation default.t1[v#43,c#44] parquet
    
    scala> df.show()
    +---+
    |  v|
    +---+
    |  c|
    +---+
    ```
    
    This fix is lated to TPCDS q17; the query returns nothing because of this 
bug: https://github.com/apache/spark/pull/31886/files#r599333799
    
    ### Why are the changes needed?
    
    Bugfix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests added.
    
    Closes #31940 from maropu/FixCharPadding.
    
    Authored-by: Takeshi Yamamuro <yamam...@apache.org>
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
    (cherry picked from commit 150769bcedb6e4a97596e0f04d686482cd09e92a)
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 45 ++++++++++++++---
 .../apache/spark/sql/CharVarcharTestSuite.scala    | 57 ++++++++++++++++------
 2 files changed, 79 insertions(+), 23 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index f4cdeab..d490845 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3921,16 +3921,28 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.resolveOperatorsUp {
-      case operator if operator.resolved => operator.transformExpressionsUp {
+      case operator => operator.transformExpressionsUp {
+        case e if !e.childrenResolved => e
+
         // String literal is treated as char type when it's compared to a char 
type column.
         // We should pad the shorter one to the longer length.
         case b @ BinaryComparison(attr: Attribute, lit) if lit.foldable =>
-          padAttrLitCmp(attr, lit).map { newChildren =>
+          padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
             b.withNewChildren(newChildren)
           }.getOrElse(b)
 
         case b @ BinaryComparison(lit, attr: Attribute) if lit.foldable =>
-          padAttrLitCmp(attr, lit).map { newChildren =>
+          padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
+            b.withNewChildren(newChildren.reverse)
+          }.getOrElse(b)
+
+        case b @ BinaryComparison(or @ OuterReference(attr: Attribute), lit) 
if lit.foldable =>
+          padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
+            b.withNewChildren(newChildren)
+          }.getOrElse(b)
+
+        case b @ BinaryComparison(lit, or @ OuterReference(attr: Attribute)) 
if lit.foldable =>
+          padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
             b.withNewChildren(newChildren.reverse)
           }.getOrElse(b)
 
@@ -3954,6 +3966,12 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
         case b @ BinaryComparison(left: Attribute, right: Attribute) =>
           
b.withNewChildren(CharVarcharUtils.addPaddingInStringComparison(Seq(left, 
right)))
 
+        case b @ BinaryComparison(OuterReference(left: Attribute), right: 
Attribute) =>
+          b.withNewChildren(padOuterRefAttrCmp(left, right))
+
+        case b @ BinaryComparison(left: Attribute, OuterReference(right: 
Attribute)) =>
+          b.withNewChildren(padOuterRefAttrCmp(right, left).reverse)
+
         case i @ In(attr: Attribute, list) if 
list.forall(_.isInstanceOf[Attribute]) =>
           val newChildren = CharVarcharUtils.addPaddingInStringComparison(
             attr +: list.map(_.asInstanceOf[Attribute]))
@@ -3962,9 +3980,12 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
     }
   }
 
-  private def padAttrLitCmp(attr: Attribute, lit: Expression): 
Option[Seq[Expression]] = {
-    if (attr.dataType == StringType) {
-      CharVarcharUtils.getRawType(attr.metadata).flatMap {
+  private def padAttrLitCmp(
+      expr: Expression,
+      metadata: Metadata,
+      lit: Expression): Option[Seq[Expression]] = {
+    if (expr.dataType == StringType) {
+      CharVarcharUtils.getRawType(metadata).flatMap {
         case CharType(length) =>
           val str = lit.eval().asInstanceOf[UTF8String]
           if (str == null) {
@@ -3972,9 +3993,9 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
           } else {
             val stringLitLen = str.numChars()
             if (length < stringLitLen) {
-              Some(Seq(StringRPad(attr, Literal(stringLitLen)), lit))
+              Some(Seq(StringRPad(expr, Literal(stringLitLen)), lit))
             } else if (length > stringLitLen) {
-              Some(Seq(attr, StringRPad(lit, Literal(length))))
+              Some(Seq(expr, StringRPad(lit, Literal(length))))
             } else {
               None
             }
@@ -3986,6 +4007,14 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
     }
   }
 
+  private def padOuterRefAttrCmp(outerAttr: Attribute, attr: Attribute): 
Seq[Expression] = {
+    val Seq(r, newAttr) = 
CharVarcharUtils.addPaddingInStringComparison(Seq(outerAttr, attr))
+    val newOuterRef = r.transform {
+      case ar: Attribute if ar.semanticEquals(outerAttr) => OuterReference(ar)
+    }
+    Seq(newOuterRef, newAttr)
+  }
+
   private def addPadding(expr: Expression, charLength: Int, targetLength: 
Int): Expression = {
     if (targetLength > charLength) StringRPad(expr, Literal(targetLength)) 
else expr
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
index 1775172..76f7f42 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CharVarcharTestSuite.scala
@@ -584,21 +584,6 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
     }
   }
 
-  test("SPARK-33992: char/varchar resolution in correlated sub query") {
-    withTable("t1", "t2") {
-      sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING $format")
-      sql(s"CREATE TABLE t2(v VARCHAR(3), c CHAR(5)) USING $format")
-      sql("INSERT INTO t1 VALUES ('c', 'b')")
-      sql("INSERT INTO t2 VALUES ('a', 'b')")
-
-      checkAnswer(sql(
-        """
-          |SELECT v FROM t1
-          |WHERE 'a' IN (SELECT v FROM t2 WHERE t1.c = t2.c )""".stripMargin),
-        Row("c"))
-    }
-  }
-
   test("SPARK-34003: fix char/varchar fails w/ both group by and order by ") {
     withTable("t") {
       sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
@@ -633,6 +618,48 @@ trait CharVarcharTestSuite extends QueryTest with 
SQLTestUtils {
       checkAnswer(spark.table("t"), Row("c "))
     }
   }
+
+  test("SPARK-34833: right-padding applied correctly for correlated subqueries 
- join keys") {
+    withTable("t1", "t2") {
+      sql(s"CREATE TABLE t1(v VARCHAR(3), c CHAR(5)) USING $format")
+      sql(s"CREATE TABLE t2(v VARCHAR(5), c CHAR(8)) USING $format")
+      sql("INSERT INTO t1 VALUES ('c', 'b')")
+      sql("INSERT INTO t2 VALUES ('a', 'b')")
+      Seq("t1.c = t2.c", "t2.c = t1.c",
+        "t1.c = 'b'", "'b' = t1.c", "t1.c = 'b    '", "'b    ' = t1.c",
+        "t1.c = 'b      '", "'b      ' = t1.c").foreach { predicate =>
+        checkAnswer(sql(
+          s"""
+             |SELECT v FROM t1
+             |WHERE 'a' IN (SELECT v FROM t2 WHERE $predicate)
+           """.stripMargin),
+          Row("c"))
+      }
+    }
+  }
+
+  test("SPARK-34833: right-padding applied correctly for correlated subqueries 
- other preds") {
+    withTable("t") {
+      sql(s"CREATE TABLE t(c0 INT, c1 CHAR(5), c2 CHAR(7)) USING $format")
+      sql("INSERT INTO t VALUES (1, 'abc', 'abc')")
+      Seq("c1 = 'abc'", "'abc' = c1", "c1 = 'abc  '", "'abc  ' = c1",
+        "c1 = 'abc    '", "'abc    ' = c1", "c1 = c2", "c2 = c1",
+        "c1 IN ('xxx', 'abc', 'xxxxx')", "c1 IN ('xxx', 'abc  ', 'xxxxx')",
+        "c1 IN ('xxx', 'abc    ', 'xxxxx')",
+        "c1 IN (c2)", "c2 IN (c1)").foreach { predicate =>
+        checkAnswer(sql(
+          s"""
+             |SELECT c0 FROM t t1
+             |WHERE (
+             |  SELECT count(*) AS c
+             |  FROM t
+             |  WHERE c0 = t1.c0 AND $predicate
+             |) > 0
+         """.stripMargin),
+          Row(1))
+      }
+    }
+  }
 }
 
 // Some basic char/varchar tests which doesn't rely on table implementation.

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to