This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 76e4425efc2 [SPARK-42753] ReusedExchange refers to non-existent nodes 76e4425efc2 is described below commit 76e4425efc22218fba04ad0aba8e6c1f6bb4954a Author: Steven Chen <steven.c...@databricks.com> AuthorDate: Wed Mar 22 09:28:18 2023 +0800 [SPARK-42753] ReusedExchange refers to non-existent nodes ### What changes were proposed in this pull request? This PR addresses a rare bug with the EXPLAIN function and Spark UI that can happen when AQE takes effect with multiple ReusedExchange nodes. The bug causes the ReusedExchange to point to an unknown child since that child subtree was "pruned" in a previous AQE iteration. This PR fixes the issue by finding all the ReusedExchange nodes in the tree that have a `child` node that has NOT been processed in the final plan (meaning it has no ID or it has an incorrect ID generated from the previous AQE iteration). It then traverses the child subtree and generates correct IDs for them. We print this missing subtree in a new section called `Adaptively Optimized Out Exchanges`. ### Why are the changes needed? Below is an example to demonstrate the root cause: > AdaptiveSparkPlan > |-- SomeNode X (subquery xxx) > |-- Exchange A > |-- SomeNode Y > |-- Exchange B > > Subquery:Hosting operator = SomeNode Hosting Expression = xxx dynamicpruning#388 > AdaptiveSparkPlan > |-- SomeNode M > |-- Exchange C > |-- SomeNode N > |-- Exchange D > Step 1: Exchange B is materialized and the QueryStage is added to stage cache Step 2: Exchange D reuses Exchange B Step 3: Exchange C is materialized and the QueryStage is added to stage cache Step 4: Exchange A reuses Exchange C Then the final plan looks like: > AdaptiveSparkPlan > |-- SomeNode X (subquery xxx) > |-- Exchange A -> ReusedExchange (reuses Exchange C) > > > Subquery:Hosting operator = SomeNode Hosting Expression = xxx dynamicpruning#388 > AdaptiveSparkPlan > |-- SomeNode M > |-- Exchange C -> PhotonShuffleMapStage .... > |-- SomeNode N > |-- Exchange D -> ReusedExchange (reuses Exchange B) > As a result, the ReusedExchange (reuses Exchange B) will refer to a non-exist node. ### Does this PR introduce _any_ user-facing change? **Explain Text Before and After** **Before:** ``` +- ReusedExchange (105) (105) ReusedExchange [Reuses operator id: unknown] Output [3]: [sr_customer_sk#303, sr_store_sk#307, sum#413L] ``` **After:** ``` +- ReusedExchange (105) +- Exchange (132) +- * HashAggregate (131) +- * Project (130) +- * BroadcastHashJoin Inner BuildRight (129) :- * Filter (128) : +- * ColumnarToRow (127) : +- Scan parquet hive_metastore.tpcds_sf1000_delta.store_returns (126) +- ShuffleQueryStage (115), Statistics(sizeInBytes=5.7 KiB, rowCount=366, [d_date_sk#234 -> ColumnStat(Some(362),Some(2415022),Some(2488070),Some(0),Some(4),Some(4),None,2)], isRuntime=true) +- ReusedExchange (114) (105) ReusedExchange [Reuses operator id: 132] Output [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L] (126) Scan parquet hive_metastore.tpcds_sf1000_delta.store_returns Output [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, sr_returned_date_sk#214] Batched: true Location: PreparedDeltaFileIndex [dbfs:/mnt/performance-datasets/2018TPC/tpcds-2.4/sf1000_delta/store_returns] PartitionFilters: [isnotnull(sr_returned_date_sk#214), dynamicpruningexpression(sr_returned_date_sk#214 IN dynamicpruning#329)] PushedFilters: [IsNotNull(sr_store_sk)] ReadSchema: struct<sr_customer_sk:int,sr_store_sk:int,sr_return_amt:decimal(7,2)> (127) ColumnarToRow Input [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, sr_returned_date_sk#214] (128) Filter Input [4]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, sr_returned_date_sk#214] Condition : isnotnull(sr_store_sk#221) (114) ReusedExchange [Reuses operator id: 8] Output [1]: [d_date_sk#234] (115) ShuffleQueryStage Output [1]: [d_date_sk#234] Arguments: 2, Statistics(sizeInBytes=5.7 KiB, rowCount=366, [d_date_sk#234 -> ColumnStat(Some(362),Some(2415022),Some(2488070),Some(0),Some(4),Some(4),None,2)], isRuntime=true) (129) BroadcastHashJoin Left keys [1]: [sr_returned_date_sk#214] Right keys [1]: [d_date_sk#234] Join type: Inner Join condition: None (130) Project Output [3]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225] Input [5]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225, sr_returned_date_sk#214, d_date_sk#234] (131) HashAggregate Input [3]: [sr_customer_sk#217, sr_store_sk#221, sr_return_amt#225] Keys [2]: [sr_customer_sk#217, sr_store_sk#221] Functions [1]: [partial_sum(UnscaledValue(sr_return_amt#225)) AS sum#327L] Aggregate Attributes [1]: [sum#326L] Results [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L] (132) Exchange Input [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L] Arguments: hashpartitioning(sr_store_sk#221, 200), ENSURE_REQUIREMENTS, [plan_id=1791] ``` **Spark UI Before and After** **Before:** <img width="339" alt="Screenshot 2023-03-10 at 10 52 46 AM" src="https://user-images.githubusercontent.com/83618776/224406011-e622ad11-37e6-48c6-b556-cd5c7708e237.png"> **After:** ![image](https://user-images.githubusercontent.com/83618776/224406076-4fcbf918-2a8d-4776-b91a-36815752cf2a.png) ### How was this patch tested? Unit tests were added to `ExplainSuite`. And manually tested with ExplainSuite. Closes #40385 from StevenChenDatabricks/fix-reused. Authored-by: Steven Chen <steven.c...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/execution/ExplainUtils.scala | 81 ++++++++-- .../scala/org/apache/spark/sql/ExplainSuite.scala | 165 +++++++++++++++++++++ 2 files changed, 237 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala index 12ffbc8554e..3da3e646f36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala @@ -17,12 +17,17 @@ package org.apache.spark.sql.execution +import java.util.Collections.newSetFromMap +import java.util.IdentityHashMap +import java.util.Set + import scala.collection.mutable.{ArrayBuffer, BitSet} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} +import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec} object ExplainUtils extends AdaptiveSparkPlanHelper { /** @@ -73,14 +78,34 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { */ def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = { try { + // Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow + // intentional overwriting of IDs generated in previous AQE iteration + val operators = newSetFromMap[QueryPlan[_]](new IdentityHashMap()) + // Initialize an array of ReusedExchanges to help find Adaptively Optimized Out + // Exchanges as part of SPARK-42753 + val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec] + var currentOperatorID = 0 - currentOperatorID = generateOperatorIDs(plan, currentOperatorID) + currentOperatorID = generateOperatorIDs(plan, currentOperatorID, operators, reusedExchanges, + true) val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)] getSubqueries(plan, subqueries) - subqueries.foldLeft(currentOperatorID) { - (curId, plan) => generateOperatorIDs(plan._3.child, curId) + currentOperatorID = subqueries.foldLeft(currentOperatorID) { + (curId, plan) => generateOperatorIDs(plan._3.child, curId, operators, reusedExchanges, + true) + } + + // SPARK-42753: Process subtree for a ReusedExchange with unknown child + val optimizedOutExchanges = ArrayBuffer.empty[Exchange] + reusedExchanges.foreach{ reused => + val child = reused.child + if (!operators.contains(child)) { + optimizedOutExchanges.append(child) + currentOperatorID = generateOperatorIDs(child, currentOperatorID, operators, + reusedExchanges, false) + } } val collectedOperators = BitSet.empty @@ -103,6 +128,17 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { } append("\n") } + + i = 0 + optimizedOutExchanges.foreach{ exchange => + if (i == 0) { + append("\n===== Adaptively Optimized Out Exchanges =====\n\n") + } + i = i + 1 + append(s"Subplan:$i\n") + processPlanSkippingSubqueries[SparkPlan](exchange, append, collectedOperators) + append("\n") + } } finally { removeTags(plan) } @@ -119,17 +155,40 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { * @param plan Input query plan to process * @param startOperatorID The start value of operation id. The subsequent operations will be * assigned higher value. + * @param visited A unique set of operators visited by generateOperatorIds. The set is scoped + * at the callsite function processPlan. It serves two purpose: Firstly, it is + * used to avoid accidentally overwriting existing IDs that were generated in + * the same processPlan call. Secondly, it is used to allow for intentional ID + * overwriting as part of SPARK-42753 where an Adaptively Optimized Out Exchange + * and its subtree may contain IDs that were generated in a previous AQE + * iteration's processPlan call which would result in incorrect IDs. + * @param reusedExchanges A unique set of ReusedExchange nodes visited which will be used to + * idenitfy adaptively optimized out exchanges in SPARK-42753. + * @param addReusedExchanges Whether to add ReusedExchange nodes to reusedExchanges set. We set it + * to false to avoid processing more nested ReusedExchanges nodes in the + * subtree of an Adpatively Optimized Out Exchange. * @return The last generated operation id for this input plan. This is to ensure we always * assign incrementing unique id to each operator. */ - private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): Int = { + private def generateOperatorIDs( + plan: QueryPlan[_], + startOperatorID: Int, + visited: Set[QueryPlan[_]], + reusedExchanges: ArrayBuffer[ReusedExchangeExec], + addReusedExchanges: Boolean): Int = { var currentOperationID = startOperatorID // Skip the subqueries as they are not printed as part of main query block. if (plan.isInstanceOf[BaseSubqueryExec]) { return currentOperationID } - def setOpId(plan: QueryPlan[_]): Unit = if (plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) { + def setOpId(plan: QueryPlan[_]): Unit = if (!visited.contains(plan)) { + plan match { + case r: ReusedExchangeExec if addReusedExchanges => + reusedExchanges.append(r) + case _ => + } + visited.add(plan) currentOperationID += 1 plan.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID) } @@ -138,18 +197,22 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { case _: WholeStageCodegenExec => case _: InputAdapter => case p: AdaptiveSparkPlanExec => - currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID) + currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID, visited, + reusedExchanges, addReusedExchanges) if (!p.executedPlan.fastEquals(p.initialPlan)) { - currentOperationID = generateOperatorIDs(p.initialPlan, currentOperationID) + currentOperationID = generateOperatorIDs(p.initialPlan, currentOperationID, visited, + reusedExchanges, addReusedExchanges) } setOpId(p) case p: QueryStageExec => - currentOperationID = generateOperatorIDs(p.plan, currentOperationID) + currentOperationID = generateOperatorIDs(p.plan, currentOperationID, visited, + reusedExchanges, addReusedExchanges) setOpId(p) case other: QueryPlan[_] => setOpId(other) currentOperationID = other.innerChildren.foldLeft(currentOperationID) { - (curId, plan) => generateOperatorIDs(plan, curId) + (curId, plan) => generateOperatorIDs(plan, curId, visited, reusedExchanges, + addReusedExchanges) } } currentOperationID diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index a6b295578d6..3ed989c4035 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -17,9 +17,13 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.physical.RoundRobinPartitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand +import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeExec} +import org.apache.spark.sql.execution.joins.SortMergeJoinExec import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.TestOptionsSource @@ -771,6 +775,167 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit FormattedMode, statistics) } + + test("SPARK-42753: Process subtree for ReusedExchange with unknown child") { + // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has + // no ID. This is a rare edge case that could arise during AQE if there are multiple + // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed + val exchange = ShuffleExchangeExec(RoundRobinPartitioning(10), + RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10))) + val reused = ReusedExchangeExec(exchange.output, exchange) + var results = "" + def appendStr(str: String): Unit = { + results = results + str + } + ExplainUtils.processPlan[SparkPlan](reused, appendStr(_)) + + val expectedTree = """|ReusedExchange (1) + | + | + |(1) ReusedExchange [Reuses operator id: 3] + |Output [1]: [id#xL] + | + |===== Adaptively Optimized Out Exchanges ===== + | + |Subplan:1 + |Exchange (3) + |+- Range (2) + | + | + |(2) Range + |Output [1]: [id#xL] + |Arguments: Range (0, 1000, step=1, splits=Some(10)) + | + |(3) Exchange + |Input [1]: [id#xL] + |Arguments: RoundRobinPartitioning(10), ENSURE_REQUIREMENTS, [plan_id=x] + | + |""".stripMargin + + results = results.replaceAll("#\\d+", "#x").replaceAll("plan_id=\\d+", "plan_id=x") + assert(results == expectedTree) + } + + test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") { + // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange + // Only one exchange node should be printed + val exchange = ShuffleExchangeExec(RoundRobinPartitioning(10), + RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10))) + val reused1 = ReusedExchangeExec(exchange.output, exchange) + val reused2 = ReusedExchangeExec(exchange.output, exchange) + val join = SortMergeJoinExec(reused1.output, reused2.output, Inner, None, reused1, reused2) + + var results = "" + def appendStr(str: String): Unit = { + results = results + str + } + + ExplainUtils.processPlan[SparkPlan](join, appendStr(_)) + + val expectedTree = """|SortMergeJoin Inner (3) + |:- ReusedExchange (1) + |+- ReusedExchange (2) + | + | + |(1) ReusedExchange [Reuses operator id: 5] + |Output [1]: [id#xL] + | + |(2) ReusedExchange [Reuses operator id: 5] + |Output [1]: [id#xL] + | + |(3) SortMergeJoin + |Left keys [1]: [id#xL] + |Right keys [1]: [id#xL] + |Join type: Inner + |Join condition: None + | + |===== Adaptively Optimized Out Exchanges ===== + | + |Subplan:1 + |Exchange (5) + |+- Range (4) + | + | + |(4) Range + |Output [1]: [id#xL] + |Arguments: Range (0, 1000, step=1, splits=Some(10)) + | + |(5) Exchange + |Input [1]: [id#xL] + |Arguments: RoundRobinPartitioning(10), ENSURE_REQUIREMENTS, [plan_id=x] + | + |""".stripMargin + results = results.replaceAll("#\\d+", "#x").replaceAll("plan_id=\\d+", "plan_id=x") + assert(results == expectedTree) + } + + test("SPARK-42753: Correctly separate two ReusedExchange not sharing subtree") { + // Simulate two ReusedExchanges reusing two different Exchanges that appear similar + // The two exchanges should have separate IDs and printed separately + val exchange1 = ShuffleExchangeExec(RoundRobinPartitioning(10), + RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10))) + val reused1 = ReusedExchangeExec(exchange1.output, exchange1) + val exchange2 = ShuffleExchangeExec(RoundRobinPartitioning(10), + RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10))) + val reused2 = ReusedExchangeExec(exchange2.output, exchange2) + val join = SortMergeJoinExec(reused1.output, reused2.output, Inner, None, reused1, reused2) + + var results = "" + def appendStr(str: String): Unit = { + results = results + str + } + + ExplainUtils.processPlan[SparkPlan](join, appendStr(_)) + + val expectedTree = """|SortMergeJoin Inner (3) + |:- ReusedExchange (1) + |+- ReusedExchange (2) + | + | + |(1) ReusedExchange [Reuses operator id: 5] + |Output [1]: [id#xL] + | + |(2) ReusedExchange [Reuses operator id: 7] + |Output [1]: [id#xL] + | + |(3) SortMergeJoin + |Left keys [1]: [id#xL] + |Right keys [1]: [id#xL] + |Join type: Inner + |Join condition: None + | + |===== Adaptively Optimized Out Exchanges ===== + | + |Subplan:1 + |Exchange (5) + |+- Range (4) + | + | + |(4) Range + |Output [1]: [id#xL] + |Arguments: Range (0, 1000, step=1, splits=Some(10)) + | + |(5) Exchange + |Input [1]: [id#xL] + |Arguments: RoundRobinPartitioning(10), ENSURE_REQUIREMENTS, [plan_id=x] + | + |Subplan:2 + |Exchange (7) + |+- Range (6) + | + | + |(6) Range + |Output [1]: [id#xL] + |Arguments: Range (0, 1000, step=1, splits=Some(10)) + | + |(7) Exchange + |Input [1]: [id#xL] + |Arguments: RoundRobinPartitioning(10), ENSURE_REQUIREMENTS, [plan_id=x] + | + |""".stripMargin + results = results.replaceAll("#\\d+", "#x").replaceAll("plan_id=\\d+", "plan_id=x") + assert(results == expectedTree) + } } case class ExplainSingleData(id: Int) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org