This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new de68152f01c [SPARK-39887][SQL][3.1] RemoveRedundantAliases should keep aliases that make the output of projection nodes unique de68152f01c is described below commit de68152f01c13ff69d61dca31db1a516e7145bfe Author: Peter Toth <pt...@cloudera.com> AuthorDate: Mon Aug 15 21:45:01 2022 +0800 [SPARK-39887][SQL][3.1] RemoveRedundantAliases should keep aliases that make the output of projection nodes unique ### What changes were proposed in this pull request? Keep the output attributes of a `Union` node's first child in the `RemoveRedundantAliases` rule to avoid correctness issues. ### Why are the changes needed? To fix the result of the following query: ``` SELECT a, b AS a FROM ( SELECT a, a AS b FROM (SELECT a FROM VALUES (1) AS t(a)) UNION ALL SELECT a, b FROM (SELECT a, b FROM VALUES (1, 2) AS t(a, b)) ) ``` Before this PR the query returns the incorrect result: ``` +---+---+ | a| a| +---+---+ | 1| 1| | 2| 2| +---+---+ ``` After this PR it returns the expected result: ``` +---+---+ | a| a| +---+---+ | 1| 1| | 1| 2| +---+---+ ``` ### Does this PR introduce _any_ user-facing change? Yes, fixes a correctness issue. ### How was this patch tested? Added new UTs. Closes #37496 from peter-toth/SPARK-39887-keep-attributes-of-unions-first-child-3.1. Authored-by: Peter Toth <pt...@cloudera.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 27 +++++++--- .../RemoveRedundantAliasAndProjectSuite.scala | 2 +- .../org/apache/spark/sql/DataFrameSuite.scala | 61 ++++++++++++++++++++++ .../sql/execution/metric/SQLMetricsSuite.scala | 5 +- 4 files changed, 86 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index e5f531ff2f5..03e50e5c386 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -429,9 +429,11 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { } /** - * Remove redundant alias expression from a LogicalPlan and its subtree. A set of excludes is used - * to prevent the removal of seemingly redundant aliases used to deduplicate the input for a - * (self) join or to prevent the removal of top-level subquery attributes. + * Remove redundant alias expression from a LogicalPlan and its subtree. + * A set of excludes is used to prevent the removal of: + * - seemingly redundant aliases used to deduplicate the input for a (self) join, + * - top-level subquery attributes and + * - attributes of a Union's first child */ private def removeRedundantAliases(plan: LogicalPlan, excluded: AttributeSet): LogicalPlan = { plan match { @@ -455,6 +457,22 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { }) Join(newLeft, newRight, joinType, newCondition, hint) + case u: Union => + var first = true + plan.mapChildren { child => + if (first) { + first = false + // `Union` inherits its first child's outputs. We don't remove those aliases from the + // first child's tree that prevent aliased attributes to appear multiple times in the + // `Union`'s output. A parent projection node on the top of an `Union` with non-unique + // output attributes could return incorrect result. + removeRedundantAliases(child, excluded ++ child.outputSet) + } else { + // We don't need to exclude those attributes that `Union` inherits from its first child. + removeRedundantAliases(child, excluded -- u.children.head.outputSet) + } + } + case _ => // Remove redundant aliases in the subtree(s). val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)] @@ -464,9 +482,6 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { newChild } - // Create the attribute mapping. Note that the currentNextAttrPairs can contain duplicate - // keys in case of Union (this is caused by the PushProjectionThroughUnion rule); in this - // case we use the first mapping (which should be provided by the first child). val mapping = AttributeMap(currentNextAttrPairs.toSeq) // Create a an expression cleaning function for nodes that can actually produce redundant diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index 2e0ab7f64f4..c09ff39a7ae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -97,7 +97,7 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper val r2 = LocalRelation('b.int) val query = r1.select('a as 'a).union(r2.select('b as 'b)).select('a).analyze val optimized = Optimize.execute(query) - val expected = r1.union(r2) + val expected = r1.select($"a" as "a").union(r2).analyze comparePlans(optimized, expected) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 78dbddc7494..5523c278e0c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2779,6 +2779,67 @@ class DataFrameSuite extends QueryTest } } } + + test("SPARK-39887: RemoveRedundantAliases should keep attributes of a Union's first child") { + val df = sql( + """ + |SELECT a, b AS a FROM ( + | SELECT a, a AS b FROM (SELECT a FROM VALUES (1) AS t(a)) + | UNION ALL + | SELECT a, b FROM (SELECT a, b FROM VALUES (1, 2) AS t(a, b)) + |) + |""".stripMargin) + val stringCols = df.logicalPlan.output.map(Column(_).cast(StringType)) + val castedDf = df.select(stringCols: _*) + checkAnswer(castedDf, Row("1", "1") :: Row("1", "2") :: Nil) + } + + test("SPARK-39887: RemoveRedundantAliases should keep attributes of a Union's first child 2") { + val df = sql( + """ + |SELECT + | to_date(a) a, + | to_date(b) b + |FROM + | ( + | SELECT + | a, + | a AS b + | FROM + | ( + | SELECT + | to_date(a) a + | FROM + | VALUES + | ('2020-02-01') AS t1(a) + | GROUP BY + | to_date(a) + | ) t3 + | UNION ALL + | SELECT + | a, + | b + | FROM + | ( + | SELECT + | to_date(a) a, + | to_date(b) b + | FROM + | VALUES + | ('2020-01-01', '2020-01-02') AS t1(a, b) + | GROUP BY + | to_date(a), + | to_date(b) + | ) t4 + | ) t5 + |GROUP BY + | to_date(a), + | to_date(b); + |""".stripMargin) + checkAnswer(df, + Row(java.sql.Date.valueOf("2020-02-01"), java.sql.Date.valueOf("2020-02-01")) :: + Row(java.sql.Date.valueOf("2020-01-01"), java.sql.Date.valueOf("2020-01-02")) :: Nil) + } } case class GroupByKey(a: Int, b: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 6628567b9a3..2cecf33a6e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -597,8 +597,9 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils val union = view.union(view) testSparkPlanMetrics(union, 1, Map( 0L -> ("Union" -> Map()), - 1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)), - 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)))) + 1L -> ("Project" -> Map()), + 2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)), + 3L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)))) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org