This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 76e4425efc2 [SPARK-42753] ReusedExchange refers to non-existent nodes
76e4425efc2 is described below

commit 76e4425efc22218fba04ad0aba8e6c1f6bb4954a
Author: Steven Chen <steven.c...@databricks.com>
AuthorDate: Wed Mar 22 09:28:18 2023 +0800

    [SPARK-42753] ReusedExchange refers to non-existent nodes
    
    ### What changes were proposed in this pull request?
    
    This PR addresses a rare bug with the EXPLAIN function and Spark UI that 
can happen when AQE takes effect with multiple ReusedExchange nodes. The bug 
causes the ReusedExchange to point to an unknown child since that child subtree 
was "pruned" in a previous AQE iteration.
    
    This PR fixes the issue by finding all the ReusedExchange nodes in the tree 
that have a `child` node that has NOT been processed in the final plan (meaning 
it has no ID or it has an incorrect ID generated from the previous AQE 
iteration). It then traverses the child subtree and generates correct IDs for 
them. We print this missing subtree in a new section called `Adaptively 
Optimized Out Exchanges`.
    
    ### Why are the changes needed?
    
    Below is an example to demonstrate the root cause:
    
    > AdaptiveSparkPlan
    >   |-- SomeNode X (subquery xxx)
    >       |-- Exchange A
    >           |-- SomeNode Y
    >               |-- Exchange B
    >
    > Subquery:Hosting operator = SomeNode Hosting Expression = xxx 
dynamicpruning#388
    > AdaptiveSparkPlan
    >   |-- SomeNode M
    >       |-- Exchange C
    >           |-- SomeNode N
    >               |-- Exchange D
    >
    
    Step 1: Exchange B is materialized and the QueryStage is added to stage 
cache
    
    Step 2: Exchange D reuses Exchange B
    
    Step 3: Exchange C is materialized and the QueryStage is added to stage 
cache
    
    Step 4: Exchange A reuses Exchange C
    
    Then the final plan looks like:
    
    > AdaptiveSparkPlan
    >   |-- SomeNode X (subquery xxx)
    >       |-- Exchange A -> ReusedExchange (reuses Exchange C)
    >
    >
    > Subquery:Hosting operator = SomeNode Hosting Expression = xxx 
dynamicpruning#388
    > AdaptiveSparkPlan
    >   |-- SomeNode M
    >       |-- Exchange C -> PhotonShuffleMapStage ....
    >           |-- SomeNode N
    >               |-- Exchange D -> ReusedExchange (reuses Exchange B)
    >
    
    As a result, the ReusedExchange (reuses Exchange B) will refer to a 
non-exist node.
    
    ### Does this PR introduce _any_ user-facing change?
    
    **Explain Text Before and After**
    **Before:**
    ```
    +- ReusedExchange (105)
    
    (105) ReusedExchange [Reuses operator id: unknown]
    Output [3]: [sr_customer_sk#303, sr_store_sk#307, sum#413L]
    ```
    
    **After:**
    ```
    +- ReusedExchange (105)
       +- Exchange (132)
          +- * HashAggregate (131)
             +- * Project (130)
                +- * BroadcastHashJoin Inner BuildRight (129)
                   :- * Filter (128)
                   :  +- * ColumnarToRow (127)
                   :     +- Scan parquet 
hive_metastore.tpcds_sf1000_delta.store_returns (126)
                   +- ShuffleQueryStage (115), Statistics(sizeInBytes=5.7 KiB, 
rowCount=366, [d_date_sk#234 -> 
ColumnStat(Some(362),Some(2415022),Some(2488070),Some(0),Some(4),Some(4),None,2)],
 isRuntime=true)
                      +- ReusedExchange (114)
    
    (105) ReusedExchange [Reuses operator id: 132]
    Output [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L]
    
    (126) Scan parquet hive_metastore.tpcds_sf1000_delta.store_returns
    Output [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, 
sr_returned_date_sk#214]
    Batched: true
    Location: PreparedDeltaFileIndex 
[dbfs:/mnt/performance-datasets/2018TPC/tpcds-2.4/sf1000_delta/store_returns]
    PartitionFilters: [isnotnull(sr_returned_date_sk#214), 
dynamicpruningexpression(sr_returned_date_sk#214 IN dynamicpruning#329)]
    PushedFilters: [IsNotNull(sr_store_sk)]
    ReadSchema: 
struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)>
    
    (127) ColumnarToRow
    Input [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, 
sr_returned_date_sk#214]
    
    (128) Filter
    Input [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, 
sr_returned_date_sk#214]
    Condition : isnotnull(sr_store_sk#221)
    
    (114) ReusedExchange [Reuses operator id: 8]
    Output [1]: [d_date_sk#234]
    
    (115) ShuffleQueryStage
    Output [1]: [d_date_sk#234]
    Arguments: 2, Statistics(sizeInBytes=5.7 KiB, rowCount=366, [d_date_sk#234 
-> 
ColumnStat(Some(362),Some(2415022),Some(2488070),Some(0),Some(4),Some(4),None,2)],
 isRuntime=true)
    
    (129) BroadcastHashJoin
    Left keys [1]: [sr_returned_date_sk#214]
    Right keys [1]: [d_date_sk#234]
    Join type: Inner
    Join condition: None
    
    (130) Project
    Output [3]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225]
    Input [5]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, 
sr_returned_date_sk#214, d_date_sk#234]
    
    (131) HashAggregate
    Input [3]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225]
    Keys [2]: [sr_customer_sk#217, sr_store_sk#221]
    Functions [1]: [partial_sum(UnscaledValue(sr_return_amt#225)) AS sum#327L]
    Aggregate Attributes [1]: [sum#326L]
    Results [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L]
    
    (132) Exchange
    Input [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L]
    Arguments: hashpartitioning(sr_store_sk#221, 200), ENSURE_REQUIREMENTS, 
[plan_id=1791]
    ```
    
    **Spark UI Before and After**
    
    **Before:**
    <img width="339" alt="Screenshot 2023-03-10 at 10 52 46 AM" 
src="https://user-images.githubusercontent.com/83618776/224406011-e622ad11-37e6-48c6-b556-cd5c7708e237.png";>
    
    **After:**
    
![image](https://user-images.githubusercontent.com/83618776/224406076-4fcbf918-2a8d-4776-b91a-36815752cf2a.png)
    
    ### How was this patch tested?
    
    Unit tests were added to `ExplainSuite`. And manually tested with 
ExplainSuite.
    
    Closes #40385 from StevenChenDatabricks/fix-reused.
    
    Authored-by: Steven Chen <steven.c...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/execution/ExplainUtils.scala  |  81 ++++++++--
 .../scala/org/apache/spark/sql/ExplainSuite.scala  | 165 +++++++++++++++++++++
 2 files changed, 237 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
index 12ffbc8554e..3da3e646f36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
@@ -17,12 +17,17 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.Collections.newSetFromMap
+import java.util.IdentityHashMap
+import java.util.Set
+
 import scala.collection.mutable.{ArrayBuffer, BitSet}
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AdaptiveSparkPlanHelper, QueryStageExec}
+import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
 
 object ExplainUtils extends AdaptiveSparkPlanHelper {
   /**
@@ -73,14 +78,34 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    */
   def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
     try {
+      // Initialize a reference-unique set of Operators to avoid accdiental 
overwrites and to allow
+      // intentional overwriting of IDs generated in previous AQE iteration
+      val operators = newSetFromMap[QueryPlan[_]](new IdentityHashMap())
+      // Initialize an array of ReusedExchanges to help find Adaptively 
Optimized Out
+      // Exchanges as part of SPARK-42753
+      val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]
+
       var currentOperatorID = 0
-      currentOperatorID = generateOperatorIDs(plan, currentOperatorID)
+      currentOperatorID = generateOperatorIDs(plan, currentOperatorID, 
operators, reusedExchanges,
+        true)
 
       val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, 
BaseSubqueryExec)]
       getSubqueries(plan, subqueries)
 
-      subqueries.foldLeft(currentOperatorID) {
-        (curId, plan) => generateOperatorIDs(plan._3.child, curId)
+      currentOperatorID = subqueries.foldLeft(currentOperatorID) {
+        (curId, plan) => generateOperatorIDs(plan._3.child, curId, operators, 
reusedExchanges,
+          true)
+      }
+
+      // SPARK-42753: Process subtree for a ReusedExchange with unknown child
+      val optimizedOutExchanges = ArrayBuffer.empty[Exchange]
+      reusedExchanges.foreach{ reused =>
+        val child = reused.child
+        if (!operators.contains(child)) {
+          optimizedOutExchanges.append(child)
+          currentOperatorID = generateOperatorIDs(child, currentOperatorID, 
operators,
+            reusedExchanges, false)
+        }
       }
 
       val collectedOperators = BitSet.empty
@@ -103,6 +128,17 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
         }
         append("\n")
       }
+
+      i = 0
+      optimizedOutExchanges.foreach{ exchange =>
+        if (i == 0) {
+          append("\n===== Adaptively Optimized Out Exchanges =====\n\n")
+        }
+        i = i + 1
+        append(s"Subplan:$i\n")
+        processPlanSkippingSubqueries[SparkPlan](exchange, append, 
collectedOperators)
+        append("\n")
+      }
     } finally {
       removeTags(plan)
     }
@@ -119,17 +155,40 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    * @param plan Input query plan to process
    * @param startOperatorID The start value of operation id. The subsequent 
operations will be
    *                        assigned higher value.
+   * @param visited A unique set of operators visited by generateOperatorIds. 
The set is scoped
+   *                at the callsite function processPlan. It serves two 
purpose: Firstly, it is
+   *                used to avoid accidentally overwriting existing IDs that 
were generated in
+   *                the same processPlan call. Secondly, it is used to allow 
for intentional ID
+   *                overwriting as part of SPARK-42753 where an Adaptively 
Optimized Out Exchange
+   *                and its subtree may contain IDs that were generated in a 
previous AQE
+   *                iteration's processPlan call which would result in 
incorrect IDs.
+   * @param reusedExchanges A unique set of ReusedExchange nodes visited which 
will be used to
+   *                        idenitfy adaptively optimized out exchanges in 
SPARK-42753.
+   * @param addReusedExchanges Whether to add ReusedExchange nodes to 
reusedExchanges set. We set it
+   *                           to false to avoid processing more nested 
ReusedExchanges nodes in the
+   *                           subtree of an Adpatively Optimized Out Exchange.
    * @return The last generated operation id for this input plan. This is to 
ensure we always
    *         assign incrementing unique id to each operator.
    */
-  private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): 
Int = {
+  private def generateOperatorIDs(
+      plan: QueryPlan[_],
+      startOperatorID: Int,
+      visited: Set[QueryPlan[_]],
+      reusedExchanges: ArrayBuffer[ReusedExchangeExec],
+      addReusedExchanges: Boolean): Int = {
     var currentOperationID = startOperatorID
     // Skip the subqueries as they are not printed as part of main query block.
     if (plan.isInstanceOf[BaseSubqueryExec]) {
       return currentOperationID
     }
 
-    def setOpId(plan: QueryPlan[_]): Unit = if 
(plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
+    def setOpId(plan: QueryPlan[_]): Unit = if (!visited.contains(plan)) {
+      plan match {
+        case r: ReusedExchangeExec if addReusedExchanges =>
+          reusedExchanges.append(r)
+        case _ =>
+      }
+      visited.add(plan)
       currentOperationID += 1
       plan.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
     }
@@ -138,18 +197,22 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
       case _: WholeStageCodegenExec =>
       case _: InputAdapter =>
       case p: AdaptiveSparkPlanExec =>
-        currentOperationID = generateOperatorIDs(p.executedPlan, 
currentOperationID)
+        currentOperationID = generateOperatorIDs(p.executedPlan, 
currentOperationID, visited,
+          reusedExchanges, addReusedExchanges)
         if (!p.executedPlan.fastEquals(p.initialPlan)) {
-          currentOperationID = generateOperatorIDs(p.initialPlan, 
currentOperationID)
+          currentOperationID = generateOperatorIDs(p.initialPlan, 
currentOperationID, visited,
+            reusedExchanges, addReusedExchanges)
         }
         setOpId(p)
       case p: QueryStageExec =>
-        currentOperationID = generateOperatorIDs(p.plan, currentOperationID)
+        currentOperationID = generateOperatorIDs(p.plan, currentOperationID, 
visited,
+          reusedExchanges, addReusedExchanges)
         setOpId(p)
       case other: QueryPlan[_] =>
         setOpId(other)
         currentOperationID = other.innerChildren.foldLeft(currentOperationID) {
-          (curId, plan) => generateOperatorIDs(plan, curId)
+          (curId, plan) => generateOperatorIDs(plan, curId, visited, 
reusedExchanges,
+            addReusedExchanges)
         }
     }
     currentOperationID
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index a6b295578d6..3ed989c4035 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -17,9 +17,13 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, 
EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
+import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, 
ShuffleExchangeExec}
+import org.apache.spark.sql.execution.joins.SortMergeJoinExec
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.TestOptionsSource
@@ -771,6 +775,167 @@ class ExplainSuiteAE extends ExplainSuiteHelper with 
EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an 
Exchange node that has
+    // no ID. This is a rare edge case that could arise during AQE if there 
are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID 
and gets printed
+    val exchange = ShuffleExchangeExec(RoundRobinPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 
10)))
+    val reused = ReusedExchangeExec(exchange.output, exchange)
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |
+                          |
+                          |(1) ReusedExchange [Reuses operator id: 3]
+                          |Output [1]: [id#xL]
+                          |
+                          |===== Adaptively Optimized Out Exchanges =====
+                          |
+                          |Subplan:1
+                          |Exchange (3)
+                          |+- Range (2)
+                          |
+                          |
+                          |(2) Range
+                          |Output [1]: [id#xL]
+                          |Arguments: Range (0, 1000, step=1, splits=Some(10))
+                          |
+                          |(3) Exchange
+                          |Input [1]: [id#xL]
+                          |Arguments: RoundRobinPartitioning(10), 
ENSURE_REQUIREMENTS, [plan_id=x]
+                          |
+                          |""".stripMargin
+
+    results = results.replaceAll("#\\d+", "#x").replaceAll("plan_id=\\d+", 
"plan_id=x")
+    assert(results == expectedTree)
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the 
same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(RoundRobinPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 
10)))
+    val reused1 = ReusedExchangeExec(exchange.output, exchange)
+    val reused2 = ReusedExchangeExec(exchange.output, exchange)
+    val join = SortMergeJoinExec(reused1.output, reused2.output, Inner, None, 
reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |+- ReusedExchange (2)
+                          |
+                          |
+                          |(1) ReusedExchange [Reuses operator id: 5]
+                          |Output [1]: [id#xL]
+                          |
+                          |(2) ReusedExchange [Reuses operator id: 5]
+                          |Output [1]: [id#xL]
+                          |
+                          |(3) SortMergeJoin
+                          |Left keys [1]: [id#xL]
+                          |Right keys [1]: [id#xL]
+                          |Join type: Inner
+                          |Join condition: None
+                          |
+                          |===== Adaptively Optimized Out Exchanges =====
+                          |
+                          |Subplan:1
+                          |Exchange (5)
+                          |+- Range (4)
+                          |
+                          |
+                          |(4) Range
+                          |Output [1]: [id#xL]
+                          |Arguments: Range (0, 1000, step=1, splits=Some(10))
+                          |
+                          |(5) Exchange
+                          |Input [1]: [id#xL]
+                          |Arguments: RoundRobinPartitioning(10), 
ENSURE_REQUIREMENTS, [plan_id=x]
+                          |
+                          |""".stripMargin
+    results = results.replaceAll("#\\d+", "#x").replaceAll("plan_id=\\d+", 
"plan_id=x")
+    assert(results == expectedTree)
+  }
+
+  test("SPARK-42753: Correctly separate two ReusedExchange not sharing 
subtree") {
+    // Simulate two ReusedExchanges reusing two different Exchanges that 
appear similar
+    // The two exchanges should have separate IDs and printed separately
+    val exchange1 = ShuffleExchangeExec(RoundRobinPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 
10)))
+    val reused1 = ReusedExchangeExec(exchange1.output, exchange1)
+    val exchange2 = ShuffleExchangeExec(RoundRobinPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 
10)))
+    val reused2 = ReusedExchangeExec(exchange2.output, exchange2)
+    val join = SortMergeJoinExec(reused1.output, reused2.output, Inner, None, 
reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |+- ReusedExchange (2)
+                          |
+                          |
+                          |(1) ReusedExchange [Reuses operator id: 5]
+                          |Output [1]: [id#xL]
+                          |
+                          |(2) ReusedExchange [Reuses operator id: 7]
+                          |Output [1]: [id#xL]
+                          |
+                          |(3) SortMergeJoin
+                          |Left keys [1]: [id#xL]
+                          |Right keys [1]: [id#xL]
+                          |Join type: Inner
+                          |Join condition: None
+                          |
+                          |===== Adaptively Optimized Out Exchanges =====
+                          |
+                          |Subplan:1
+                          |Exchange (5)
+                          |+- Range (4)
+                          |
+                          |
+                          |(4) Range
+                          |Output [1]: [id#xL]
+                          |Arguments: Range (0, 1000, step=1, splits=Some(10))
+                          |
+                          |(5) Exchange
+                          |Input [1]: [id#xL]
+                          |Arguments: RoundRobinPartitioning(10), 
ENSURE_REQUIREMENTS, [plan_id=x]
+                          |
+                          |Subplan:2
+                          |Exchange (7)
+                          |+- Range (6)
+                          |
+                          |
+                          |(6) Range
+                          |Output [1]: [id#xL]
+                          |Arguments: Range (0, 1000, step=1, splits=Some(10))
+                          |
+                          |(7) Exchange
+                          |Input [1]: [id#xL]
+                          |Arguments: RoundRobinPartitioning(10), 
ENSURE_REQUIREMENTS, [plan_id=x]
+                          |
+                          |""".stripMargin
+    results = results.replaceAll("#\\d+", "#x").replaceAll("plan_id=\\d+", 
"plan_id=x")
+    assert(results == expectedTree)
+  }
 }
 
 case class ExplainSingleData(id: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to