This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new de68152f01c [SPARK-39887][SQL][3.1] RemoveRedundantAliases should keep 
aliases that make the output of projection nodes unique
de68152f01c is described below

commit de68152f01c13ff69d61dca31db1a516e7145bfe
Author: Peter Toth <pt...@cloudera.com>
AuthorDate: Mon Aug 15 21:45:01 2022 +0800

    [SPARK-39887][SQL][3.1] RemoveRedundantAliases should keep aliases that 
make the output of projection nodes unique
    
    ### What changes were proposed in this pull request?
    Keep the output attributes of a `Union` node's first child in the 
`RemoveRedundantAliases` rule to avoid correctness issues.
    
    ### Why are the changes needed?
    To fix the result of the following query:
    ```
    SELECT a, b AS a FROM (
      SELECT a, a AS b FROM (SELECT a FROM VALUES (1) AS t(a))
      UNION ALL
      SELECT a, b FROM (SELECT a, b FROM VALUES (1, 2) AS t(a, b))
    )
    ```
    Before this PR the query returns the incorrect result:
    ```
    +---+---+
    |  a|  a|
    +---+---+
    |  1|  1|
    |  2|  2|
    +---+---+
    ```
    After this PR it returns the expected result:
    ```
    +---+---+
    |  a|  a|
    +---+---+
    |  1|  1|
    |  1|  2|
    +---+---+
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, fixes a correctness issue.
    
    ### How was this patch tested?
    Added new UTs.
    
    Closes #37496 from 
peter-toth/SPARK-39887-keep-attributes-of-unions-first-child-3.1.
    
    Authored-by: Peter Toth <pt...@cloudera.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 27 +++++++---
 .../RemoveRedundantAliasAndProjectSuite.scala      |  2 +-
 .../org/apache/spark/sql/DataFrameSuite.scala      | 61 ++++++++++++++++++++++
 .../sql/execution/metric/SQLMetricsSuite.scala     |  5 +-
 4 files changed, 86 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index e5f531ff2f5..03e50e5c386 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -429,9 +429,11 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
   }
 
   /**
-   * Remove redundant alias expression from a LogicalPlan and its subtree. A 
set of excludes is used
-   * to prevent the removal of seemingly redundant aliases used to deduplicate 
the input for a
-   * (self) join or to prevent the removal of top-level subquery attributes.
+   * Remove redundant alias expression from a LogicalPlan and its subtree.
+   * A set of excludes is used to prevent the removal of:
+   * - seemingly redundant aliases used to deduplicate the input for a (self) 
join,
+   * - top-level subquery attributes and
+   * - attributes of a Union's first child
    */
   private def removeRedundantAliases(plan: LogicalPlan, excluded: 
AttributeSet): LogicalPlan = {
     plan match {
@@ -455,6 +457,22 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
         })
         Join(newLeft, newRight, joinType, newCondition, hint)
 
+      case u: Union =>
+        var first = true
+        plan.mapChildren { child =>
+          if (first) {
+            first = false
+            // `Union` inherits its first child's outputs. We don't remove 
those aliases from the
+            // first child's tree that prevent aliased attributes to appear 
multiple times in the
+            // `Union`'s output. A parent projection node on the top of an 
`Union` with non-unique
+            // output attributes could return incorrect result.
+            removeRedundantAliases(child, excluded ++ child.outputSet)
+          } else {
+            // We don't need to exclude those attributes that `Union` inherits 
from its first child.
+            removeRedundantAliases(child, excluded -- 
u.children.head.outputSet)
+          }
+        }
+
       case _ =>
         // Remove redundant aliases in the subtree(s).
         val currentNextAttrPairs = mutable.Buffer.empty[(Attribute, Attribute)]
@@ -464,9 +482,6 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
           newChild
         }
 
-        // Create the attribute mapping. Note that the currentNextAttrPairs 
can contain duplicate
-        // keys in case of Union (this is caused by the 
PushProjectionThroughUnion rule); in this
-        // case we use the first mapping (which should be provided by the 
first child).
         val mapping = AttributeMap(currentNextAttrPairs.toSeq)
 
         // Create a an expression cleaning function for nodes that can 
actually produce redundant
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
index 2e0ab7f64f4..c09ff39a7ae 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala
@@ -97,7 +97,7 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest 
with PredicateHelper
     val r2 = LocalRelation('b.int)
     val query = r1.select('a as 'a).union(r2.select('b as 
'b)).select('a).analyze
     val optimized = Optimize.execute(query)
-    val expected = r1.union(r2)
+    val expected = r1.select($"a" as "a").union(r2).analyze
     comparePlans(optimized, expected)
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 78dbddc7494..5523c278e0c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2779,6 +2779,67 @@ class DataFrameSuite extends QueryTest
       }
     }
   }
+
+  test("SPARK-39887: RemoveRedundantAliases should keep attributes of a 
Union's first child") {
+    val df = sql(
+      """
+        |SELECT a, b AS a FROM (
+        |  SELECT a, a AS b FROM (SELECT a FROM VALUES (1) AS t(a))
+        |  UNION ALL
+        |  SELECT a, b FROM (SELECT a, b FROM VALUES (1, 2) AS t(a, b))
+        |)
+        |""".stripMargin)
+    val stringCols = df.logicalPlan.output.map(Column(_).cast(StringType))
+    val castedDf = df.select(stringCols: _*)
+    checkAnswer(castedDf, Row("1", "1") :: Row("1", "2") :: Nil)
+  }
+
+  test("SPARK-39887: RemoveRedundantAliases should keep attributes of a 
Union's first child 2") {
+    val df = sql(
+      """
+        |SELECT
+        |  to_date(a) a,
+        |  to_date(b) b
+        |FROM
+        |  (
+        |    SELECT
+        |      a,
+        |      a AS b
+        |    FROM
+        |      (
+        |        SELECT
+        |          to_date(a) a
+        |        FROM
+        |        VALUES
+        |          ('2020-02-01') AS t1(a)
+        |        GROUP BY
+        |          to_date(a)
+        |      ) t3
+        |    UNION ALL
+        |    SELECT
+        |      a,
+        |      b
+        |    FROM
+        |      (
+        |        SELECT
+        |          to_date(a) a,
+        |          to_date(b) b
+        |        FROM
+        |        VALUES
+        |          ('2020-01-01', '2020-01-02') AS t1(a, b)
+        |        GROUP BY
+        |          to_date(a),
+        |          to_date(b)
+        |      ) t4
+        |  ) t5
+        |GROUP BY
+        |  to_date(a),
+        |  to_date(b);
+        |""".stripMargin)
+    checkAnswer(df,
+      Row(java.sql.Date.valueOf("2020-02-01"), 
java.sql.Date.valueOf("2020-02-01")) ::
+        Row(java.sql.Date.valueOf("2020-01-01"), 
java.sql.Date.valueOf("2020-01-02")) :: Nil)
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 6628567b9a3..2cecf33a6e8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -597,8 +597,9 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils
       val union = view.union(view)
       testSparkPlanMetrics(union, 1, Map(
         0L -> ("Union" -> Map()),
-        1L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)),
-        2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L))))
+        1L -> ("Project" -> Map()),
+        2L -> ("LocalTableScan" -> Map("number of output rows" -> 2L)),
+        3L -> ("LocalTableScan" -> Map("number of output rows" -> 2L))))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to