This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a633f77 [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations a633f77 is described below commit a633f77b5120d94eee6beff8615137bf537bbff9 Author: chenzhx <c...@apache.org> AuthorDate: Wed Mar 2 01:23:05 2022 +0800 [SPARK-37932][SQL] Wait to resolve missing attributes before applying DeduplicateRelations ### What changes were proposed in this pull request? When the join with duplicate view like ``` SELECT l1.idFROM v1 l1 INNER JOIN ( SELECT id FROM v1 GROUP BY id HAVING COUNT(DISTINCT name) > 1 ) l2 ON l1.id = l2.id GROUP BY l1.name, l1.id; ``` The error stack is: ``` Resolved attribute(s) name#26 missing from id#31,name#32 in operator !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L]. Attribute(s) with the same name appear in the operation: name. Please check if the right attribute(s) are used.; Aggregate [name#26, id#25], [id#25] +- Join Inner, (id#25 = id#31) :- SubqueryAlias l1 : +- SubqueryAlias spark_catalog.default.v1 : +- View (`default`.`v1`, [id#25,name#26]) : +- Project [cast(id#20 as int) AS id#25, cast(name#21 as string) AS name#26] : +- Project [id#20, name#21] : +- SubqueryAlias spark_catalog.default.t : +- Relation default.t[id#20,name#21] parquet +- SubqueryAlias l2 +- Project [id#31] +- Filter (count(distinct name#26)#33L > cast(1 as bigint)) +- !Aggregate [id#31], [id#31, count(distinct name#26) AS count(distinct name#26)#33L] +- SubqueryAlias spark_catalog.default.v1 +- View (`default`.`v1`, [id#31,name#32]) +- Project [cast(id#27 as int) AS id#31, cast(name#28 as string) AS name#32] +- Project [id#27, name#28] +- SubqueryAlias spark_catalog.default.t +- Relation default.t[id#27,name#28] parquet ``` Spark will consider the two views to be duplicates, which will cause the query to fail. ### Why are the changes needed? Fix bug when using join in duplicate views. ### Does this PR introduce _any_ user-facing change? Yes. When we join with duplicate view, the query would be successful. DeduplicateRelations should only kick in if the plan's children are all resolved and valid. ### How was this patch tested? Add new UT Closes #35684 from chenzhx/SPARK-37932. Authored-by: chenzhx <c...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/DeduplicateRelations.scala | 7 ++++++- .../org/apache/spark/sql/execution/SQLViewSuite.scala | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala index 55b1c22..4c351e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala @@ -40,7 +40,12 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) { object DeduplicateRelations extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { - renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning( + val newPlan = renewDuplicatedRelations(mutable.HashSet.empty, plan)._1 + if (newPlan.find(p => p.resolved && p.missingInput.nonEmpty).isDefined) { + // Wait for `ResolveMissingReferences` to resolve missing attributes first + return newPlan + } + newPlan.resolveOperatorsUpWithPruning( _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND), ruleId) { case p: LogicalPlan if !p.childrenResolved => p diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index ee6d352..9e6974a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -907,4 +907,23 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { } } } + + test("SPARK-37932: view join with same view") { + withTable("t") { + withView("v1") { + Seq((1, "test1"), (2, "test2"), (1, "test2")).toDF("id", "name") + .write.format("parquet").saveAsTable("t") + sql("CREATE VIEW v1 (id, name) AS SELECT id, name FROM t") + + checkAnswer( + sql("""SELECT l1.id FROM v1 l1 + |INNER JOIN ( + | SELECT id FROM v1 + | GROUP BY id HAVING COUNT(DISTINCT name) > 1 + | ) l2 ON l1.id = l2.id GROUP BY l1.name, l1.id; + |""".stripMargin), + Seq(Row(1), Row(1))) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org