This is an automated email from the ASF dual-hosted git repository.

yamamuro pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new f3c1298  [SPARK-34833][SQL][FOLLOWUP] Handle outer references in all 
the places
f3c1298 is described below

commit f3c129827986ba06c8a9ab00bd687e8d025103d1
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Mar 26 09:10:03 2021 +0900

    [SPARK-34833][SQL][FOLLOWUP] Handle outer references in all the places
    
    ### What changes were proposed in this pull request?
    
    This is a follow-up of https://github.com/apache/spark/pull/31940 . This PR 
generalizes the matching of attributes and outer references, so that outer 
references are handled everywhere.
    
    Note that, currently correlated subquery has a lot of limitations in Spark, 
and the newly covered cases are not possible to happen. So this PR is a code 
refactor.
    
    ### Why are the changes needed?
    
    code cleanup
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #31959 from cloud-fan/follow.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
    (cherry picked from commit 658e95c345d5aa2a98b8d2a854e003a5c77ed581)
    Signed-off-by: Takeshi Yamamuro <yamam...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 67 +++++++++++++---------
 1 file changed, 41 insertions(+), 26 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d490845..600a5af 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3919,6 +3919,14 @@ object UpdateOuterReferences extends Rule[LogicalPlan] {
  */
 object ApplyCharTypePadding extends Rule[LogicalPlan] {
 
+  object AttrOrOuterRef {
+    def unapply(e: Expression): Option[Attribute] = e match {
+      case a: Attribute => Some(a)
+      case OuterReference(a: Attribute) => Some(a)
+      case _ => None
+    }
+  }
+
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.resolveOperatorsUp {
       case operator => operator.transformExpressionsUp {
@@ -3926,27 +3934,17 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
 
         // String literal is treated as char type when it's compared to a char 
type column.
         // We should pad the shorter one to the longer length.
-        case b @ BinaryComparison(attr: Attribute, lit) if lit.foldable =>
-          padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
-            b.withNewChildren(newChildren)
-          }.getOrElse(b)
-
-        case b @ BinaryComparison(lit, attr: Attribute) if lit.foldable =>
-          padAttrLitCmp(attr, attr.metadata, lit).map { newChildren =>
-            b.withNewChildren(newChildren.reverse)
-          }.getOrElse(b)
-
-        case b @ BinaryComparison(or @ OuterReference(attr: Attribute), lit) 
if lit.foldable =>
-          padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
+        case b @ BinaryComparison(e @ AttrOrOuterRef(attr), lit) if 
lit.foldable =>
+          padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
             b.withNewChildren(newChildren)
           }.getOrElse(b)
 
-        case b @ BinaryComparison(lit, or @ OuterReference(attr: Attribute)) 
if lit.foldable =>
-          padAttrLitCmp(or, attr.metadata, lit).map { newChildren =>
+        case b @ BinaryComparison(lit, e @ AttrOrOuterRef(attr)) if 
lit.foldable =>
+          padAttrLitCmp(e, attr.metadata, lit).map { newChildren =>
             b.withNewChildren(newChildren.reverse)
           }.getOrElse(b)
 
-        case i @ In(attr: Attribute, list)
+        case i @ In(e @ AttrOrOuterRef(attr), list)
           if attr.dataType == StringType && list.forall(_.foldable) =>
           CharVarcharUtils.getRawType(attr.metadata).flatMap {
             case CharType(length) =>
@@ -3955,7 +3953,7 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
               val literalCharLengths = literalChars.map(_.numChars())
               val targetLen = (length +: literalCharLengths).max
               Some(i.copy(
-                value = addPadding(attr, length, targetLen),
+                value = addPadding(e, length, targetLen),
                 list = list.zip(literalCharLengths).map {
                   case (lit, charLength) => addPadding(lit, charLength, 
targetLen)
                 } ++ nulls.map(Literal.create(_, StringType))))
@@ -3963,19 +3961,36 @@ object ApplyCharTypePadding extends Rule[LogicalPlan] {
           }.getOrElse(i)
 
         // For char type column or inner field comparison, pad the shorter one 
to the longer length.
-        case b @ BinaryComparison(left: Attribute, right: Attribute) =>
-          
b.withNewChildren(CharVarcharUtils.addPaddingInStringComparison(Seq(left, 
right)))
-
-        case b @ BinaryComparison(OuterReference(left: Attribute), right: 
Attribute) =>
-          b.withNewChildren(padOuterRefAttrCmp(left, right))
-
-        case b @ BinaryComparison(left: Attribute, OuterReference(right: 
Attribute)) =>
-          b.withNewChildren(padOuterRefAttrCmp(right, left).reverse)
+        case b @ BinaryComparison(e1 @ AttrOrOuterRef(left), e2 @ 
AttrOrOuterRef(right))
+            // For the same attribute, they must be the same length and no 
padding is needed.
+            if !left.semanticEquals(right) =>
+          val outerRefs = (e1, e2) match {
+            case (_: OuterReference, _: OuterReference) => Seq(left, right)
+            case (_: OuterReference, _) => Seq(left)
+            case (_, _: OuterReference) => Seq(right)
+            case _ => Nil
+          }
+          val newChildren = 
CharVarcharUtils.addPaddingInStringComparison(Seq(left, right))
+          if (outerRefs.nonEmpty) {
+            b.withNewChildren(newChildren.map(_.transform {
+              case a: Attribute if outerRefs.exists(_.semanticEquals(a)) => 
OuterReference(a)
+            }))
+          } else {
+            b.withNewChildren(newChildren)
+          }
 
-        case i @ In(attr: Attribute, list) if 
list.forall(_.isInstanceOf[Attribute]) =>
+        case i @ In(e @ AttrOrOuterRef(attr), list) if 
list.forall(_.isInstanceOf[Attribute]) =>
           val newChildren = CharVarcharUtils.addPaddingInStringComparison(
             attr +: list.map(_.asInstanceOf[Attribute]))
-          i.copy(value = newChildren.head, list = newChildren.tail)
+          if (e.isInstanceOf[OuterReference]) {
+            i.copy(
+              value = newChildren.head.transform {
+                case a: Attribute if a.semanticEquals(attr) => 
OuterReference(a)
+              },
+              list = newChildren.tail)
+          } else {
+            i.copy(value = newChildren.head, list = newChildren.tail)
+          }
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to