This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e8045fb [SPARK-28344][SQL][FOLLOW-UP] Check the ambiguous self-join only if there is a join in the plan e8045fb is described below commit e8045fb99cc463889aebcda1c8c1daa2042f4319 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Mon Jun 1 16:31:39 2020 -0700 [SPARK-28344][SQL][FOLLOW-UP] Check the ambiguous self-join only if there is a join in the plan ### What changes were proposed in this pull request? This PR proposes to check `DetectAmbiguousSelfJoin` only if there is `Join` in the plan. Currently, the checking is too strict even to non-join queries. For example, the codes below don't have join at all but it fails as the ambiguous self-join: ```scala import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.sum val df = Seq(1, 1, 2, 2).toDF("A") val w = Window.partitionBy(df("A")) df.select(df("A").alias("X"), sum(df("A")).over(w)).explain(true) ``` It is because `ExtractWindowExpressions` can create a `AttributeReference` with the same metadata but a different expression ID, see: https://github.com/apache/spark/blob/0fd98abd859049dc3b200492487041eeeaa8f737/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2679 https://github.com/apache/spark/blob/71c73d58f6e88d2558ed2e696897767d93bac60f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala#L63 https://github.com/apache/spark/blob/5945d46c11a86fd85f9e65f24c2e88f368eee01f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L180 Before: ``` 'Project [A#19 AS X#21, sum(A#19) windowspecdefinition(A#19, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L] +- Relation[A#19] parquet ``` After: ``` Project [X#21, sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L] +- Project [X#21, A#19, sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L, sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L] +- Window [sum(A#19) windowspecdefinition(A#19, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS sum(A) OVER (PARTITION BY A unspecifiedframe$())#23L], [A#19] +- Project [A#19 AS X#21, A#19] +- Relation[A#19] parquet ``` `X#21` holds the same metadata of DataFrame ID and column position with `A#19` but it has a different expression ID which ends up with the checking fails. ### Why are the changes needed? To loose the checking and make users not surprised. ### Does this PR introduce _any_ user-facing change? It's the changes in unreleased branches only. ### How was this patch tested? Manually tested and unittest was added. Closes #28695 from HyukjinKwon/SPARK-28344-followup. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit ea45fc51921e64302b9220b264156bb4f757fe01) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../sql/execution/analysis/DetectAmbiguousSelfJoin.scala | 6 ++++++ .../scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala | 12 ++++++++++++ 2 files changed, 18 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala index 614d6c2..136f7c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/analysis/DetectAmbiguousSelfJoin.scala @@ -76,6 +76,8 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan] { // We always remove the special metadata from `AttributeReference` at the end of this rule, so // Dataset column reference only exists in the root node via Dataset transformations like // `Dataset#select`. + if (plan.find(_.isInstanceOf[Join]).isEmpty) return stripColumnReferenceMetadataInPlan(plan) + val colRefAttrs = plan.expressions.flatMap(_.collect { case a: AttributeReference if isColumnReference(a) => a }) @@ -153,6 +155,10 @@ class DetectAmbiguousSelfJoin(conf: SQLConf) extends Rule[LogicalPlan] { } } + stripColumnReferenceMetadataInPlan(plan) + } + + private def stripColumnReferenceMetadataInPlan(plan: LogicalPlan): LogicalPlan = { plan.transformExpressions { case a: AttributeReference if isColumnReference(a) => // Remove the special metadata from this `AttributeReference`, as the detection is done. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala index 250ec7d..fb58c98 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSelfJoinSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions.{count, sum} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -202,4 +203,15 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { assertAmbiguousSelfJoin(df1.join(df4).join(df2).select(df2("id"))) } } + + test("SPARK-28344: don't fail as ambiguous self join when there is no join") { + withSQLConf( + SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true") { + val df = Seq(1, 1, 2, 2).toDF("a") + val w = Window.partitionBy(df("a")) + checkAnswer( + df.select(df("a").alias("x"), sum(df("a")).over(w)), + Seq((1, 2), (1, 2), (2, 4), (2, 4)).map(Row.fromTuple)) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org