This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 85bb7bf008d [SPARK-39216][SQL] Do not collapse projects in CombineUnions if it hasCorrelatedSubquery 85bb7bf008d is described below commit 85bb7bf008d0346feaedc2aab55857d8f1b19908 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Wed May 18 23:37:25 2022 -0700 [SPARK-39216][SQL] Do not collapse projects in CombineUnions if it hasCorrelatedSubquery ### What changes were proposed in this pull request? Makes `CombineUnions` do not collapse projects if it hasCorrelatedSubquery. For example: ```sql SELECT (SELECT IF(x, 1, 0)) AS a FROM (SELECT true) t(x) UNION SELECT 1 AS a ``` It will throw exception: ``` java.lang.IllegalStateException: Couldn't find x#4 in [] ``` ### Why are the changes needed? Fix bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #36595 from wangyum/SPARK-39216. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 +++- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 25 ++++++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 2f93cf2d8c3..6b9746a880f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1357,7 +1357,9 @@ object CombineUnions extends Rule[LogicalPlan] { while (stack.nonEmpty) { stack.pop() match { case p1 @ Project(_, p2: Project) - if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline = false) => + if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline = false) && + !p1.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) && + !p2.projectList.exists(SubqueryExpression.hasCorrelatedSubquery) => val newProjectList = buildCleanedProjectList(p1.projectList, p2.projectList) stack.pushAll(Seq(p2.copy(projectList = newProjectList))) case Distinct(Union(children, byName, allowMissingCol)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 72897d15302..2bfaa5b22d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4431,6 +4431,31 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark )) } } + + test("SPARK-39216: Don't collapse projects in CombineUnions if it hasCorrelatedSubquery") { + checkAnswer( + sql( + """ + |SELECT (SELECT IF(x, 1, 0)) AS a + |FROM (SELECT true) t(x) + |UNION + |SELECT 1 AS a + """.stripMargin), + Seq(Row(1))) + + checkAnswer( + sql( + """ + |SELECT x + 1 + |FROM (SELECT id + | + (SELECT Max(id) + | FROM range(2)) AS x + | FROM range(1)) t + |UNION + |SELECT 1 AS a + """.stripMargin), + Seq(Row(2), Row(1))) + } } case class Foo(bar: Option[String]) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org