This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 459c4b0c94a [SPARK-39144][SQL] Nested subquery expressions deduplicate 
relations should be done bottom up
459c4b0c94a is described below

commit 459c4b0c94a39efe9ea8b5ef1da3f6e379417c40
Author: Rui Wang <rui.w...@databricks.com>
AuthorDate: Tue May 24 13:05:29 2022 +0800

    [SPARK-39144][SQL] Nested subquery expressions deduplicate relations should 
be done bottom up
    
    ### What changes were proposed in this pull request?
    
    When we have nested subquery expressions, there is a chance that 
deduplicate relations could replace an attributes with a wrong one. This is 
because the attributes replacement is done by top down than bottom up. This 
could happen if the subplan gets deduplicate relations first (thus two same 
relation with different attributes id), then a more complex plan built on top 
of the subplan (e.g. a UNION of queries with nested subquery expressions) can 
trigger this wrong attribute replacement error.
    
    For concrete example please see the added unit test.
    
    ### Why are the changes needed?
    
    This is bug that we can fix. Without this PR, we could hit that outer 
attribute reference does not exist in the outer relation at certain scenario.
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    UT
    
    Closes #36503 from amaliujia/testnestedsubqueryexpression.
    
    Authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit d9fd36eb76fcfec95763cc4dc594eb7856b0fad2)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 18 ++++++++++
 .../catalyst/analysis/DeduplicateRelations.scala   | 26 +++++++--------
 .../sql/catalyst/analysis/AnalysisSuite.scala      | 38 ++++++++++++++++++++++
 3 files changed, 69 insertions(+), 13 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index b9f3b3b824b..9c72b9974c4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -728,9 +728,27 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
       expressions.exists(_.exists(_.semanticEquals(expr)))
     }
 
+    def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = 
p match {
+      case f: Filter =>
+        if (hasOuterReferences(expr.plan)) {
+          expr.plan.expressions.foreach(_.foreachUp {
+            case o: OuterReference =>
+              p.children.foreach(e =>
+                if (!e.output.exists(_.exprId == o.exprId)) {
+                  failAnalysis("outer attribute not found")
+                })
+            case _ =>
+          })
+        }
+      case _ =>
+    }
+
     // Validate the subquery plan.
     checkAnalysis(expr.plan)
 
+    // Check if there is outer attribute that cannot be found from the plan.
+    checkOuterReference(plan, expr)
+
     expr match {
       case ScalarSubquery(query, outerAttrs, _, _) =>
         // Scalar subquery must return one column as output.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
index 4c351e3237d..aed19f2499f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
@@ -125,9 +125,18 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           }
         }
 
+        val planWithNewSubquery = plan.transformExpressions {
+          case subquery: SubqueryExpression =>
+            val (renewed, collected, changed) = renewDuplicatedRelations(
+              existingRelations ++ relations, subquery.plan)
+            relations ++= collected
+            if (changed) planChanged = true
+            subquery.withNewPlan(renewed)
+        }
+
         if (planChanged) {
-          if (plan.childrenResolved) {
-            val planWithNewChildren = plan.withNewChildren(newChildren.toSeq)
+          if (planWithNewSubquery.childrenResolved) {
+            val planWithNewChildren = 
planWithNewSubquery.withNewChildren(newChildren.toSeq)
             val attrMap = AttributeMap(
               plan
                 .children
@@ -140,7 +149,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
               planWithNewChildren.rewriteAttrs(attrMap)
             }
           } else {
-            plan.withNewChildren(newChildren.toSeq)
+            planWithNewSubquery.withNewChildren(newChildren.toSeq)
           }
         } else {
           plan
@@ -148,16 +157,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
       } else {
         plan
       }
-
-      val planWithNewSubquery = newPlan.transformExpressions {
-        case subquery: SubqueryExpression =>
-          val (renewed, collected, changed) = renewDuplicatedRelations(
-            existingRelations ++ relations, subquery.plan)
-          relations ++= collected
-          if (changed) planChanged = true
-          subquery.withNewPlan(renewed)
-      }
-      (planWithNewSubquery, relations, planChanged)
+      (newPlan, relations, planChanged)
   }
 
   /**
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
index fff25b59eff..1f82aa7e355 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala
@@ -1176,4 +1176,42 @@ class AnalysisSuite extends AnalysisTest with Matchers {
         false)
     }
   }
+
+  test("SPARK-39144: nested subquery expressions deduplicate relations should 
be done bottom up") {
+    val innerRelation = SubqueryAlias("src1", testRelation)
+    val outerRelation = SubqueryAlias("src2", testRelation)
+    val ref1 = testRelation.output.head
+
+    val subPlan = getAnalyzer.execute(
+      Project(
+        Seq(UnresolvedStar(None)),
+        Filter.apply(
+          Exists(
+            Filter.apply(
+              EqualTo(
+                OuterReference(ref1),
+                ref1),
+              innerRelation
+            )
+          ),
+          outerRelation
+        )))
+
+    val finalPlan = {
+      Union.apply(
+        Project(
+          Seq(UnresolvedStar(None)),
+          subPlan
+        ),
+        Filter.apply(
+          Exists(
+            subPlan
+          ),
+          subPlan
+        )
+      )
+    }
+
+    assertAnalysisSuccess(finalPlan)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to