This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 459c4b0c94a [SPARK-39144][SQL] Nested subquery expressions deduplicate relations should be done bottom up 459c4b0c94a is described below commit 459c4b0c94a39efe9ea8b5ef1da3f6e379417c40 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Tue May 24 13:05:29 2022 +0800 [SPARK-39144][SQL] Nested subquery expressions deduplicate relations should be done bottom up ### What changes were proposed in this pull request? When we have nested subquery expressions, there is a chance that deduplicate relations could replace an attributes with a wrong one. This is because the attributes replacement is done by top down than bottom up. This could happen if the subplan gets deduplicate relations first (thus two same relation with different attributes id), then a more complex plan built on top of the subplan (e.g. a UNION of queries with nested subquery expressions) can trigger this wrong attribute replacement error. For concrete example please see the added unit test. ### Why are the changes needed? This is bug that we can fix. Without this PR, we could hit that outer attribute reference does not exist in the outer relation at certain scenario. ### Does this PR introduce _any_ user-facing change? NO ### How was this patch tested? UT Closes #36503 from amaliujia/testnestedsubqueryexpression. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit d9fd36eb76fcfec95763cc4dc594eb7856b0fad2) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/CheckAnalysis.scala | 18 ++++++++++ .../catalyst/analysis/DeduplicateRelations.scala | 26 +++++++-------- .../sql/catalyst/analysis/AnalysisSuite.scala | 38 ++++++++++++++++++++++ 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index b9f3b3b824b..9c72b9974c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -728,9 +728,27 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog { expressions.exists(_.exists(_.semanticEquals(expr))) } + def checkOuterReference(p: LogicalPlan, expr: SubqueryExpression): Unit = p match { + case f: Filter => + if (hasOuterReferences(expr.plan)) { + expr.plan.expressions.foreach(_.foreachUp { + case o: OuterReference => + p.children.foreach(e => + if (!e.output.exists(_.exprId == o.exprId)) { + failAnalysis("outer attribute not found") + }) + case _ => + }) + } + case _ => + } + // Validate the subquery plan. checkAnalysis(expr.plan) + // Check if there is outer attribute that cannot be found from the plan. + checkOuterReference(plan, expr) + expr match { case ScalarSubquery(query, outerAttrs, _, _) => // Scalar subquery must return one column as output. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 4c351e3237d..aed19f2499f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -125,9 +125,18 @@ object DeduplicateRelations extends Rule[LogicalPlan] { } } + val planWithNewSubquery = plan.transformExpressions { + case subquery: SubqueryExpression => + val (renewed, collected, changed) = renewDuplicatedRelations( + existingRelations ++ relations, subquery.plan) + relations ++= collected + if (changed) planChanged = true + subquery.withNewPlan(renewed) + } + if (planChanged) { - if (plan.childrenResolved) { - val planWithNewChildren = plan.withNewChildren(newChildren.toSeq) + if (planWithNewSubquery.childrenResolved) { + val planWithNewChildren = planWithNewSubquery.withNewChildren(newChildren.toSeq) val attrMap = AttributeMap( plan .children @@ -140,7 +149,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { planWithNewChildren.rewriteAttrs(attrMap) } } else { - plan.withNewChildren(newChildren.toSeq) + planWithNewSubquery.withNewChildren(newChildren.toSeq) } } else { plan @@ -148,16 +157,7 @@ object DeduplicateRelations extends Rule[LogicalPlan] { } else { plan } - - val planWithNewSubquery = newPlan.transformExpressions { - case subquery: SubqueryExpression => - val (renewed, collected, changed) = renewDuplicatedRelations( - existingRelations ++ relations, subquery.plan) - relations ++= collected - if (changed) planChanged = true - subquery.withNewPlan(renewed) - } - (planWithNewSubquery, relations, planChanged) + (newPlan, relations, planChanged) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index fff25b59eff..1f82aa7e355 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -1176,4 +1176,42 @@ class AnalysisSuite extends AnalysisTest with Matchers { false) } } + + test("SPARK-39144: nested subquery expressions deduplicate relations should be done bottom up") { + val innerRelation = SubqueryAlias("src1", testRelation) + val outerRelation = SubqueryAlias("src2", testRelation) + val ref1 = testRelation.output.head + + val subPlan = getAnalyzer.execute( + Project( + Seq(UnresolvedStar(None)), + Filter.apply( + Exists( + Filter.apply( + EqualTo( + OuterReference(ref1), + ref1), + innerRelation + ) + ), + outerRelation + ))) + + val finalPlan = { + Union.apply( + Project( + Seq(UnresolvedStar(None)), + subPlan + ), + Filter.apply( + Exists( + subPlan + ), + subPlan + ) + ) + } + + assertAnalysisSuccess(finalPlan) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org