This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d9fd36eb76f [SPARK-39144][SQL] Nested subquery expressions deduplicate 
relations should be done bottom up
d9fd36eb76f is described below

commit d9fd36eb76fcfec95763cc4dc594eb7856b0fad2
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Tue May 24 13:05:29 2022 +0800

    [SPARK-39144][SQL] Nested subquery expressions deduplicate relations should 
be done bottom up
    
    ### What changes were proposed in this pull request?
    
    When we have nested subquery expressions, there is a chance that 
deduplicate relations could replace an attributes with a wrong one. This is 
because the attributes replacement is done by top down than bottom up. This 
could happen if the subplan gets deduplicate relations first (thus two same 
relation with different attributes id), then a more complex plan built on top 
of the subplan (e.g. a UNION of queries with nested subquery expressions) can 
trigger this wrong attribute replacement error.
    
    For concrete example please see the added unit test.
    
    ### Why are the changes needed?
    
    This is bug that we can fix. Without this PR, we could hit that outer 
attribute reference does not exist in the outer relation at certain scenario.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #36503 from amaliujia/testnestedsubqueryexpression.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 18 ++++++++++
 .../catalyst/analysis/DeduplicateRelations.scala   | 26 +++++++--------
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 38 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 13 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index f827e9effe9..18dd842dd93 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -745,9 +745,27 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
       expressions.exists(_.exists(_.semanticEquals(expr)))
     }
 
+    def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = 
p match {
+      case f: Filter =>
+        if (hasOuterReferences(expr.plan)) {
+          expr.plan.expressions.foreach(_.foreachUp {
+            case o: OuterReference =>
+              p.children.foreach(e =>
+                if (!e.output.exists(_.exprId == o.exprId)) {
+                  failAnalysis("outer attribute not found")
+                })
+            case _ =>
+          })
+        }
+      case _ =>
+    }
+
     // Validate the subquery plan.
     checkAnalysis(expr.plan)
 
+    // Check if there is outer attribute that cannot be found from the plan.
+    checkOuterReference(plan, expr)
+
     expr match {
       case ScalarSubquery(query, outerAttrs, _, _) =>
         // Scalar subquery must return one column as output.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index 4c351e3237d..aed19f2499f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -125,9 +125,18 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
+        val planWithNewSubquery = plan.transformExpressions {
+          case subquery: SubqueryExpression =>
+            val (renewed, collected, changed) = renewDuplicatedRelations(
+              existingRelations ++ relations, subquery.plan)
+            relations ++= collected
+            if (changed) planChanged = true
+            subquery.withNewPlan(renewed)
+        }
+
         if (planChanged) {
-          if (plan.childrenResolved) {
-            val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
+          if (planWithNewSubquery.childrenResolved) {
+            val planWithNewChildren = 
planWithNewSubquery.withNewChildren(newChildren.toSeq)
             val attrMap = AttributeMap(
               plan
                 .children
@@ -140,7 +149,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
               planWithNewChildren.rewriteAttrs(attrMap)
             }
           } else {
-            plan.withNewChildren(newChildren.toSeq)
+            planWithNewSubquery.withNewChildren(newChildren.toSeq)
           }
         } else {
           plan
@@ -148,16 +157,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
       } else {
         plan
       }
-
-      val planWithNewSubquery = newPlan.transformExpressions {
-        case subquery: SubqueryExpression =>
-          val (renewed, collected, changed) = renewDuplicatedRelations(
-            existingRelations ++ relations, subquery.plan)
-          relations ++= collected
-          if (changed) planChanged = true
-          subquery.withNewPlan(renewed)
-      }
-      (planWithNewSubquery, relations, planChanged)
+      (newPlan, relations, planChanged)
   }
 
   /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index 2cc6c53c4ba..84f9c6c5e76 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -1176,4 +1176,42 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         false)
     }
   }
+
+  test("SPARK-39144: nested subquery expressions deduplicate relations should 
be done bottom up") {
+    val innerRelation = SubqueryAlias("src1", testRelation)
+    val outerRelation = SubqueryAlias("src2", testRelation)
+    val ref1 = testRelation.output.head
+
+    val subPlan = getAnalyzer.execute(
+      Project(
+        Seq(UnresolvedStar(None)),
+        Filter.apply(
+          Exists(
+            Filter.apply(
+              EqualTo(
+                OuterReference(ref1),
+                ref1),
+              innerRelation
+            )
+          ),
+          outerRelation
+        )))
+
+    val finalPlan = {
+      Union.apply(
+        Project(
+          Seq(UnresolvedStar(None)),
+          subPlan
+        ),
+        Filter.apply(
+          Exists(
+            subPlan
+          ),
+          subPlan
+        )
+      )
+    }
+
+    assertAnalysisSuccess(finalPlan)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to