This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9b39e4b  [SPARK-32753][SQL][3.0] Only copy tags to node with no tags
9b39e4b is described below

commit 9b39e4b7aefcedf09764b3528a32bdf0b77e331b
Author: manuzhang <owenzhang1...@gmail.com>
AuthorDate: Tue Sep 8 13:36:05 2020 +0000

    [SPARK-32753][SQL][3.0] Only copy tags to node with no tags
    
    This PR backports https://github.com/apache/spark/pull/29593 to branch-3.0
    
    ### What changes were proposed in this pull request?
    Only copy tags to node with no tags when transforming plans.
    
    ### Why are the changes needed?
    cloud-fan [made a good 
point](https://github.com/apache/spark/pull/29593#discussion_r482013121) that 
it doesn't make sense to append tags to existing nodes when nodes are removed. 
That will cause such bugs as duplicate rows when deduplicating and 
repartitioning by the same column with AQE.
    
    ```
    spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
    val df = spark.sql("select id from v1 group by id distribute by id")
    println(df.collect().toArray.mkString(","))
    println(df.queryExecution.executedPlan)
    
    // With AQE
    
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
    AdaptiveSparkPlan(isFinalPlan=true)
    +- CustomShuffleReader local
       +- ShuffleQueryStage 0
          +- Exchange hashpartitioning(id#183L, 10), true
             +- *(3) HashAggregate(keys=[id#183L], functions=[], 
output=[id#183L])
                +- Union
                   :- *(1) Range (0, 10, step=1, splits=2)
                   +- *(2) Range (0, 10, step=1, splits=2)
    
    // Without AQE
    [4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
    *(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
    +- Exchange hashpartitioning(id#206L, 10), true
       +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
          +- Union
             :- *(1) Range (0, 10, step=1, splits=2)
             +- *(2) Range (0, 10, step=1, splits=2)
    ```
    
    It's too expensive to detect node removal so we make a compromise only to 
copy tags to node with no tags.
    
    ### Does this PR introduce any user-facing change?
    Yes. Fix a bug.
    
    ### How was this patch tested?
    Add test.
    
    Closes #29665 from manuzhang/spark-32753-3.0.
    
    Authored-by: manuzhang <owenzhang1...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/trees/TreeNode.scala   |  7 ++++++-
 .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala  | 16 +++++++++++++++-
 2 files changed, 21 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index c4a1067..4c74742 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -91,7 +91,12 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty
 
   protected def copyTagsFrom(other: BaseType): Unit = {
-    tags ++= other.tags
+    // SPARK-32753: it only makes sense to copy tags to a new node
+    // but it's too expensive to detect other cases likes node removal
+    // so we make a compromise here to copy tags to node with no tags
+    if (tags.isEmpty) {
+      tags ++= other.tags
+    }
   }
 
   def setTagValue[T](tag: TreeNodeTag[T], value: T): Unit = {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index e18adbd..6d97a6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
 import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, 
ReusedSubqueryExec, ShuffledRowRDD, SparkPlan}
 import 
org.apache.spark.sql.execution.adaptive.OptimizeLocalShuffleReader.LOCAL_SHUFFLE_READER_DESCRIPTION
 import org.apache.spark.sql.execution.command.DataWritingCommandExec
-import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
Exchange, ReusedExchangeExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
Exchange, ReusedExchangeExec, ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, 
BuildRight, SortMergeJoinExec}
 import 
org.apache.spark.sql.execution.ui.SparkListenerSQLAdaptiveExecutionUpdate
 import org.apache.spark.sql.functions._
@@ -889,4 +889,18 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-32753: Only copy tags to node with no tags") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+      withTempView("v1") {
+        spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
+
+        val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
+          "SELECT id FROM v1 GROUP BY id DISTRIBUTE BY id")
+        assert(collect(adaptivePlan) {
+          case s: ShuffleExchangeExec => s
+        }.length == 1)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to