Repository: spark Updated Branches: refs/heads/master 02f967795 -> efef55388
[SPARK-24705][SQL] ExchangeCoordinator broken when duplicate exchanges reused ## What changes were proposed in this pull request? In the current master, `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`. Then, `ReuseExchange` removes some duplicate exchange and the actual number of registered exchanges changes. Finally, the assertion in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges become different; https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201 This pr fixed the issue and the code to reproduce this is as follows; ``` scala> sql("SET spark.sql.adaptive.enabled=true") scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") scala> val df = spark.range(1).selectExpr("id AS key", "id AS value") scala> val resultDf = df.join(df, "key").join(df, "key") scala> resultDf.show ... at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 101 more Caused by: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201) at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... ``` ## How was this patch tested? Added tests in `ExchangeCoordinatorSuite`. Author: Takeshi Yamamuro <yamam...@apache.org> Closes #21754 from maropu/SPARK-24705-2. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/efef5538 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/efef5538 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/efef5538 Branch: refs/heads/master Commit: efef55388fedef3f7954a385776e666ad4597a58 Parents: 02f9677 Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Thu Aug 2 13:05:36 2018 -0700 Committer: Xiao Li <gatorsm...@gmail.com> Committed: Thu Aug 2 13:05:36 2018 -0700 ---------------------------------------------------------------------- .../execution/exchange/EnsureRequirements.scala | 1 - .../exchange/ExchangeCoordinator.scala | 17 ++++++++++------ .../execution/ExchangeCoordinatorSuite.scala | 21 ++++++++++++++++---- 3 files changed, 28 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index d96ecba..d2d5011 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -82,7 +82,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { if (adaptiveExecutionEnabled && supportsCoordinator) { val coordinator = new ExchangeCoordinator( - children.length, targetPostShuffleInputSize, minNumPostShufflePartitions) children.zip(requiredChildDistributions).map { http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index 051e610..f5d93ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -83,7 +83,6 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB) */ class ExchangeCoordinator( - numExchanges: Int, advisoryTargetPostShuffleInputSize: Long, minNumPostShufflePartitions: Option[Int] = None) extends Logging { @@ -91,8 +90,14 @@ class ExchangeCoordinator( // The registered Exchange operators. private[this] val exchanges = ArrayBuffer[ShuffleExchangeExec]() + // `lazy val` is used here so that we could notice the wrong use of this class, e.g., all the + // exchanges should be registered before `postShuffleRDD` called first time. If a new exchange is + // registered after the `postShuffleRDD` call, `assert(exchanges.length == numExchanges)` fails + // in `doEstimationIfNecessary`. + private[this] lazy val numExchanges = exchanges.size + // This map is used to lookup the post-shuffle ShuffledRowRDD for an Exchange operator. - private[this] val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] = + private[this] lazy val postShuffleRDDs: JMap[ShuffleExchangeExec, ShuffledRowRDD] = new JHashMap[ShuffleExchangeExec, ShuffledRowRDD](numExchanges) // A boolean that indicates if this coordinator has made decision on how to shuffle data. @@ -117,10 +122,6 @@ class ExchangeCoordinator( */ def estimatePartitionStartIndices( mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { - // If we have mapOutputStatistics.length < numExchange, it is because we do not submit - // a stage when the number of partitions of this dependency is 0. - assert(mapOutputStatistics.length <= numExchanges) - // If minNumPostShufflePartitions is defined, it is possible that we need to use a // value less than advisoryTargetPostShuffleInputSize as the target input size of // a post shuffle task. @@ -228,6 +229,10 @@ class ExchangeCoordinator( j += 1 } + // If we have mapOutputStatistics.length < numExchange, it is because we do not submit + // a stage when the number of partitions of this dependency is 0. + assert(mapOutputStatistics.length <= numExchanges) + // Now, we estimate partitionStartIndices. partitionStartIndices.length will be the // number of post-shuffle partitions. val partitionStartIndices = http://git-wip-us.apache.org/repos/asf/spark/blob/efef5538/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala index 737eeb0..b736d43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeCoordinatorSuite.scala @@ -21,7 +21,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.{MapOutputStatistics, SparkConf, SparkFunSuite} import org.apache.spark.sql._ -import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{ExchangeCoordinator, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -58,7 +58,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } test("test estimatePartitionStartIndices - 1 Exchange") { - val coordinator = new ExchangeCoordinator(1, 100L) + val coordinator = new ExchangeCoordinator(100L) { // All bytes per partition are 0. @@ -105,7 +105,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } test("test estimatePartitionStartIndices - 2 Exchanges") { - val coordinator = new ExchangeCoordinator(2, 100L) + val coordinator = new ExchangeCoordinator(100L) { // If there are multiple values of the number of pre-shuffle partitions, @@ -199,7 +199,7 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { } test("test estimatePartitionStartIndices and enforce minimal number of reducers") { - val coordinator = new ExchangeCoordinator(2, 100L, Some(2)) + val coordinator = new ExchangeCoordinator(100L, Some(2)) { // The minimal number of post-shuffle partitions is not enforced because @@ -480,4 +480,17 @@ class ExchangeCoordinatorSuite extends SparkFunSuite with BeforeAndAfterAll { withSparkSession(test, 6144, minNumPostShufflePartitions) } } + + test("SPARK-24705 adaptive query execution works correctly when exchange reuse enabled") { + val test = { spark: SparkSession => + spark.sql("SET spark.sql.exchange.reuse=true") + val df = spark.range(1).selectExpr("id AS key", "id AS value") + val resultDf = df.join(df, "key").join(df, "key") + val sparkPlan = resultDf.queryExecution.executedPlan + assert(sparkPlan.collect { case p: ReusedExchangeExec => p }.length == 1) + assert(sparkPlan.collect { case p @ ShuffleExchangeExec(_, _, Some(c)) => p }.length == 3) + checkAnswer(resultDf, Row(0, 0, 0, 0) :: Nil) + } + withSparkSession(test, 4, None) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org