This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c43460c  [SPARK-32753][SQL] Only copy tags to node with no tags
c43460c is described below

commit c43460cf82a075fd071717489798cde6a61b8515
Author: manuzhang <owenzhang1...@gmail.com>
AuthorDate: Mon Sep 7 16:08:57 2020 +0000

    [SPARK-32753][SQL] Only copy tags to node with no tags
    
    ### What changes were proposed in this pull request?
    Only copy tags to node with no tags when transforming plans.
    
    ### Why are the changes needed?
    cloud-fan [made a good 
point](https://github.com/apache/spark/pull/29593#discussion_r482013121) that 
it doesn't make sense to append tags to existing nodes when nodes are removed. 
That will cause such bugs as duplicate rows when deduplicating and 
repartitioning by the same column with AQE.
    
    ```
    spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
    val df = spark.sql("select id from v1 group by id distribute by id")
    println(df.collect().toArray.mkString(","))
    println(df.queryExecution.executedPlan)
    
    // With AQE
    
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
    AdaptiveSparkPlan(isFinalPlan=true)
    +- CustomShuffleReader local
       +- ShuffleQueryStage 0
          +- Exchange hashpartitioning(id#183L, 10), true
             +- *(3) HashAggregate(keys=[id#183L], functions=[], 
output=[id#183L])
                +- Union
                   :- *(1) Range (0, 10, step=1, splits=2)
                   +- *(2) Range (0, 10, step=1, splits=2)
    
    // Without AQE
    [4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
    *(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
    +- Exchange hashpartitioning(id#206L, 10), true
       +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
          +- Union
             :- *(1) Range (0, 10, step=1, splits=2)
             +- *(2) Range (0, 10, step=1, splits=2)
    ```
    
    It's too expensive to detect node removal so we make a compromise only to 
copy tags to node with no tags.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Fix a bug.
    
    ### How was this patch tested?
    Add test.
    
    Closes #29593 from manuzhang/spark-32753.
    
    Authored-by: manuzhang <owenzhang1...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/trees/TreeNode.scala     |  7 ++++++-
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala    | 14 ++++++++++++++
 2 files changed, 20 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 616572d..8003012 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -92,7 +92,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty
 
   protected def copyTagsFrom(other: BaseType): Unit = {
-    tags ++= other.tags
+    // SPARK-32753: it only makes sense to copy tags to a new node
+    // but it's too expensive to detect other cases likes node removal
+    // so we make a compromise here to copy tags to node with no tags
+    if (tags.isEmpty) {
+      tags ++= other.tags
+    }
   }
 
   def setTagValue[T](tag: TreeNodeTag[T], value: T): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index f892e66..628bafa 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1224,4 +1224,18 @@ class AdaptiveQueryExecSuite
       })
     }
   }
+
+  test("SPARK-32753: Only copy tags to node with no tags") {
+    withSQLConf(
+      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true"
+    ) {
+      spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
+
+      val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+        "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id")
+      assert(collect(adaptivePlan) {
+        case s: ShuffleExchangeExec => s
+      }.length == 1)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to