[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207105078 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- sorry to confuse you, but I'm working on the issue only in this pr. Probably, the title is obscure, so I'll update soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207104309 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { if (!conf.exchangeReuseEnabled) { return plan } + // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() + +def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => Boolean): SparkPlan = { + // the exchanges that have same results usually also have same schemas (same column names). + val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) + val samePlan = sameSchema.filter(filterCondition).find { e => +exchange.sameResult(e) + } + if (samePlan.isDefined) { +// Keep the output of this exchange, the following plans require that to resolve +// attributes. +ReusedExchangeExec(exchange.output, samePlan.get) + } else { +sameSchema += exchange +exchange + } +} + plan.transformUp { + // For coordinated exchange + case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +tryReuseExchange(exchange, { + // We can reuse an exchange with the same coordinator only + case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c --- End diff -- I checked again and I found we didn't need this change (`sameResult` has already handled this case correctly), so I'll drop this. Sorry to bother you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207103198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- they are 2 different bugs, aren't they? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207102873 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- Is it bad to fix this in this pr? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207101919 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- ah i see, can we send a new PR for the bug fix? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207101662 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- The master has the same problem? I checked the query in the master below; ``` Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.0-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_31) Type in expressions to have them evaluated. Type :help for more information. scala> sql("SET spark.sql.adaptive.enabled=true") scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1") scala> val df = spark.range(1).selectExpr("id AS key", "id AS value") scala> val resultDf = df.join(df, "key").join(df, "key") scala> resultDf.show ... at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... 101 more Caused by: java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:156) at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201) at org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) ... ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207096154 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- why this problem only show up with this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207090637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { if (!conf.exchangeReuseEnabled) { return plan } + // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() + +def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => Boolean): SparkPlan = { + // the exchanges that have same results usually also have same schemas (same column names). + val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) + val samePlan = sameSchema.filter(filterCondition).find { e => +exchange.sameResult(e) + } + if (samePlan.isDefined) { +// Keep the output of this exchange, the following plans require that to resolve +// attributes. +ReusedExchangeExec(exchange.output, samePlan.get) + } else { +sameSchema += exchange +exchange + } +} + plan.transformUp { + // For coordinated exchange + case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +tryReuseExchange(exchange, { + // We can reuse an exchange with the same coordinator only + case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c --- End diff -- ok, I will. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r207090221 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- `EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` before `ReuseExchange`; https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L85 For example, in the test in this pr, it sets `3` in `ExchangeCoordinator`; https://github.com/apache/spark/pull/21754/files#diff-3cd46a3f60c5352282bd3f2c9efff7fcR505 `ReuseExchange` reuses some exchange and the actual number of registered exchanges changes, e.g., in the test in this pr, the number changes from `3` to `2`. Then, the assertion below in `ExchangeCoordinator` fails because the logical number of exchanges and the actual number of registered exchanges; https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201 The objective of this fix is to respect the number of reused exchanges in `ExchangeCoordinator`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r205929427 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { if (!conf.exchangeReuseEnabled) { return plan } + // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() + +def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => Boolean): SparkPlan = { + // the exchanges that have same results usually also have same schemas (same column names). + val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) + val samePlan = sameSchema.filter(filterCondition).find { e => +exchange.sameResult(e) + } + if (samePlan.isDefined) { +// Keep the output of this exchange, the following plans require that to resolve +// attributes. +ReusedExchangeExec(exchange.output, samePlan.get) + } else { +sameSchema += exchange +exchange + } +} + plan.transformUp { + // For coordinated exchange + case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +tryReuseExchange(exchange, { + // We can reuse an exchange with the same coordinator only + case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c --- End diff -- shall we just include `coordinator` in `ShuffleExchange#sameResult`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r205929402 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan // Ignore this wrapper for canonicalizing. override def doCanonicalize(): SparkPlan = child.canonicalized + override protected def doPrepare(): Unit = { +child match { + case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => +coordinator.registerExchange(shuffleExchange) --- End diff -- why is it needed? we forget to register the shuffle exchange in some csaes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203972003 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- Ah, ok. Iâll check if we can. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203968311 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- I think object reference also works, since currently if it's same coordinator, it's the same object. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203968016 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- can we assign an id to the `ExchangeCoordinator` so that we can correctly tell if they are same coordinators? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203966689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- We might be able to logically reuse the same coordinator though, it seems to be difficult to implement based on the current master, I think. In the current adaptive query execution, exchanges (between stages) registered in a coordinator and their partition size are decided on runtime (inside `SparkPlan.execute()`). Since `ReuseExchange` runs in the final phase of planning. So, it is difficult to tell which coordinator can be reused at that time. So, to archive the reuse, we might need some refactoring about these logics... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203630277 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- what if it's the same coordinator? can we reuse? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203437795 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- In the cache case, the reuse just doesn't happen, so no exception is thrown; ``` // the cache case == Physical Plan == *(3) HashAggregate(keys=[imei#31], functions=[]) +- Exchange(coordinator id: 1599206176) hashpartitioning(imei#31, 200), coordinator[target post-shuffle partition size: 67108864] +- *(2) HashAggregate(keys=[imei#31], functions=[]) +- *(2) Project [imei#31] +- *(2) BroadcastHashJoin [imei#31], [imei#101], Inner, BuildRight :- *(2) Filter isnotnull(imei#31) : +- *(2) InMemoryTableScan [imei#31], [isnotnull(imei#31)] :+- InMemoryRelation [imei#31, speed#32], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: struct ,None) : +- *(1) Scan JDBCRelation(device_loc) [numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: struct +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))) +- *(1) Filter isnotnull(imei#101) +- *(1) InMemoryTableScan [imei#101], [isnotnull(imei#101)] +- InMemoryRelation [imei#101, speed#102], CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: struct ,None) +- *(1) Scan JDBCRelation(device_loc) [numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: struct // the non-cache case scala> df.explain == Physical Plan == *(5) HashAggregate(keys=[imei#0], functions=[]) +- *(5) HashAggregate(keys=[imei#0], functions=[]) +- *(5) Project [imei#0] +- *(5) SortMergeJoin [imei#0], [imei#27], Inner :- *(2) Sort [imei#0 ASC NULLS FIRST], false, 0 : +- Exchange(coordinator id: 973215530) hashpartitioning(imei#0, 200), coordinator[target post-shuffle partition size: 67108864] : +- *(1) Scan JDBCRelation(device_loc) [numPartitions=1] [imei#0] PushedFilters: [*IsNotNull(imei)], ReadSchema: struct +- *(4) Sort [imei#27 ASC NULLS FIRST], false, 0 +- ReusedExchange [imei#27], Exchange(coordinator id: 973215530) hashpartitioning(imei#0, 200), coordinator[target post-shuffle partition size: 67108864] ``` `ExchangeCoordinator` determines how we shuffle data between stages, so if totally-unrelated stages share an exchange, IIUC the share easily breaks the coordinator semantics. My hunch is that, to support the reuse an exchange with a coordinator, it needs more logics in `ExchangeCoordinator` to take the share into consideration. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
Github user markhamstra commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r203416454 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan */ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { + private def supportReuseExchange(exchange: Exchange): Boolean = exchange match { +// If a coordinator defined in an exchange operator, the exchange cannot be reused --- End diff -- This seems overstated if this comment in the JIRA description is correct: "When the cache tabel device_loc is executed before this query is executed, everything is fine". In fact, if Xiao Li is correct in that statement, then this PR is eliminating a useful optimization in cases where it doesn't need to -- i.e. it is preventing Exchange reuse any time adaptive execution is used instead of only preventing reuse when it will actually cause a problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...
GitHub user maropu opened a pull request: https://github.com/apache/spark/pull/21754 [SPARK-24705][SQL] Cannot reuse an exchange operator with an adaptive execution coordinator ## What changes were proposed in this pull request? This pr fixed a bug to wrongly reuse an exchange operator with an adaptive execution coordinator. If the coordinator defined, the exchange has an independent shuffle strategy. Therefore, `ReuseExchange` cannot regard the exchange as being reusable. This pr modified code to filter out these exchanges in `ReuseExchange`. ## How was this patch tested? Added tests in `ExchangeCoordinatorSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maropu/spark SPARK-24705-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21754.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21754 commit 32c4cb2881a791f459bb040eebc41a54d0d54384 Author: Takeshi Yamamuro Date: 2018-07-12T02:35:12Z Fix --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org