[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207105078
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

sorry to confuse you, but I'm working on the issue only in this pr. 
Probably, the title is obscure, so I'll update soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207104309
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends 
Rule[SparkPlan] {
 if (!conf.exchangeReuseEnabled) {
   return plan
 }
+
 // Build a hash map using schema of exchanges to avoid O(N*N) 
sameResult calls.
 val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
+
+def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => 
Boolean): SparkPlan = {
+  // the exchanges that have same results usually also have same 
schemas (same column names).
+  val sameSchema = exchanges.getOrElseUpdate(exchange.schema, 
ArrayBuffer[Exchange]())
+  val samePlan = sameSchema.filter(filterCondition).find { e =>
+exchange.sameResult(e)
+  }
+  if (samePlan.isDefined) {
+// Keep the output of this exchange, the following plans require 
that to resolve
+// attributes.
+ReusedExchangeExec(exchange.output, samePlan.get)
+  } else {
+sameSchema += exchange
+exchange
+  }
+}
+
 plan.transformUp {
+  // For coordinated exchange
+  case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) =>
+tryReuseExchange(exchange, {
+  // We can reuse an exchange with the same coordinator only
+  case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c
--- End diff --

I checked again and I found we didn't need this change (`sameResult` has 
already handled this case correctly), so I'll drop this. Sorry to bother you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207103198
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

they are 2 different bugs, aren't they?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207102873
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

Is it bad to fix this in this pr?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207101919
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

ah i see, can we send a new PR for the bug fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207101662
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

The master has the same problem? I checked the query in the master below;
```

Welcome to
    __
 / __/__  ___ _/ /__
_\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.0-SNAPSHOT
  /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 
1.8.0_31)
Type in expressions to have them evaluated.
Type :help for more information.

scala> sql("SET spark.sql.adaptive.enabled=true")
scala> sql("SET spark.sql.autoBroadcastJoinThreshold=-1")
scala> val df = spark.range(1).selectExpr("id AS key", "id AS value")
scala> val resultDf = df.join(df, "key").join(df, "key")
scala> resultDf.show
...
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 101 more
Caused by: java.lang.AssertionError: assertion failed
  at scala.Predef$.assert(Predef.scala:156)
  at 
org.apache.spark.sql.execution.exchange.ExchangeCoordinator.doEstimationIfNecessary(ExchangeCoordinator.scala:201)
  at 
org.apache.spark.sql.execution.exchange.ExchangeCoordinator.postShuffleRDD(ExchangeCoordinator.scala:259)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:124)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at 
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
...
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207096154
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

why this problem only show up with this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207090637
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends 
Rule[SparkPlan] {
 if (!conf.exchangeReuseEnabled) {
   return plan
 }
+
 // Build a hash map using schema of exchanges to avoid O(N*N) 
sameResult calls.
 val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
+
+def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => 
Boolean): SparkPlan = {
+  // the exchanges that have same results usually also have same 
schemas (same column names).
+  val sameSchema = exchanges.getOrElseUpdate(exchange.schema, 
ArrayBuffer[Exchange]())
+  val samePlan = sameSchema.filter(filterCondition).find { e =>
+exchange.sameResult(e)
+  }
+  if (samePlan.isDefined) {
+// Keep the output of this exchange, the following plans require 
that to resolve
+// attributes.
+ReusedExchangeExec(exchange.output, samePlan.get)
+  } else {
+sameSchema += exchange
+exchange
+  }
+}
+
 plan.transformUp {
+  // For coordinated exchange
+  case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) =>
+tryReuseExchange(exchange, {
+  // We can reuse an exchange with the same coordinator only
+  case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c
--- End diff --

ok, I will.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-08-01 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r207090221
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

`EnsureRequirements` sets the number of exchanges in `ExchangeCoordinator` 
before `ReuseExchange`;

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L85
For example, in the test in this pr, it sets `3` in `ExchangeCoordinator`;

https://github.com/apache/spark/pull/21754/files#diff-3cd46a3f60c5352282bd3f2c9efff7fcR505

`ReuseExchange` reuses some exchange and the actual number of registered 
exchanges changes, e.g., in the test in this pr, the number changes from `3` to 
`2`.

Then, the assertion below in `ExchangeCoordinator` fails because the 
logical number of exchanges and the actual number of registered exchanges;

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala#L201

The objective of this fix is to respect the number of reused exchanges in 
`ExchangeCoordinator`.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r205929427
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends 
Rule[SparkPlan] {
 if (!conf.exchangeReuseEnabled) {
   return plan
 }
+
 // Build a hash map using schema of exchanges to avoid O(N*N) 
sameResult calls.
 val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
+
+def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => 
Boolean): SparkPlan = {
+  // the exchanges that have same results usually also have same 
schemas (same column names).
+  val sameSchema = exchanges.getOrElseUpdate(exchange.schema, 
ArrayBuffer[Exchange]())
+  val samePlan = sameSchema.filter(filterCondition).find { e =>
+exchange.sameResult(e)
+  }
+  if (samePlan.isDefined) {
+// Keep the output of this exchange, the following plans require 
that to resolve
+// attributes.
+ReusedExchangeExec(exchange.output, samePlan.get)
+  } else {
+sameSchema += exchange
+exchange
+  }
+}
+
 plan.transformUp {
+  // For coordinated exchange
+  case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) =>
+tryReuseExchange(exchange, {
+  // We can reuse an exchange with the same coordinator only
+  case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c
--- End diff --

shall we just include `coordinator` in `ShuffleExchange#sameResult`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r205929402
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -52,6 +52,14 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
   // Ignore this wrapper for canonicalizing.
   override def doCanonicalize(): SparkPlan = child.canonicalized
 
+  override protected def doPrepare(): Unit = {
+child match {
+  case shuffleExchange @ ShuffleExchangeExec(_, _, Some(coordinator)) 
=>
+coordinator.registerExchange(shuffleExchange)
--- End diff --

why is it needed? we forget to register the shuffle exchange in some csaes?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-20 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203972003
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

Ah, ok. I’ll check if we can.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203968311
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

I think object reference also works, since currently if it's same 
coordinator, it's the same object.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-20 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203968016
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

can we assign an id to the `ExchangeCoordinator` so that we can correctly 
tell if they are same coordinators?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-20 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203966689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

We might be able to logically reuse the same coordinator though, it seems 
to be difficult to implement based on the current master, I think. In the 
current adaptive query execution, exchanges (between stages) registered in a 
coordinator and their partition size are decided on runtime (inside 
`SparkPlan.execute()`). Since `ReuseExchange` runs in the final phase of 
planning. So, it is difficult to tell which coordinator can be reused at that 
time. So, to archive the reuse, we might need some refactoring about these 
logics...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-19 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203630277
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

what if it's the same coordinator? can we reuse?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-18 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203437795
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

In the cache case, the reuse just doesn't happen, so no exception is thrown;
```
// the cache case
== Physical Plan ==
*(3) HashAggregate(keys=[imei#31], functions=[])
+- Exchange(coordinator id: 1599206176) hashpartitioning(imei#31, 200), 
coordinator[target post-shuffle partition size: 67108864]
   +- *(2) HashAggregate(keys=[imei#31], functions=[])
  +- *(2) Project [imei#31]
 +- *(2) BroadcastHashJoin [imei#31], [imei#101], Inner, BuildRight
:- *(2) Filter isnotnull(imei#31)
:  +- *(2) InMemoryTableScan [imei#31], [isnotnull(imei#31)]
:+- InMemoryRelation [imei#31, speed#32], 
CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1] 
[imei#31,speed#32] PushedFilters: [], ReadSchema: struct
,None)
:  +- *(1) Scan JDBCRelation(device_loc) 
[numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: 
struct
+- BroadcastExchange 
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)))
   +- *(1) Filter isnotnull(imei#101)
  +- *(1) InMemoryTableScan [imei#101], 
[isnotnull(imei#101)]
+- InMemoryRelation [imei#101, speed#102], 
CachedRDDBuilder(true,1,StorageLevel(disk, memory, deserialized, 1 
replicas),*(1) Scan JDBCRelation(device_loc) [numPartitions=1] 
[imei#31,speed#32] PushedFilters: [], ReadSchema: struct
,None)
  +- *(1) Scan JDBCRelation(device_loc) 
[numPartitions=1] [imei#31,speed#32] PushedFilters: [], ReadSchema: 
struct

// the non-cache case
scala> df.explain
== Physical Plan ==
*(5) HashAggregate(keys=[imei#0], functions=[])
+- *(5) HashAggregate(keys=[imei#0], functions=[])
   +- *(5) Project [imei#0]
  +- *(5) SortMergeJoin [imei#0], [imei#27], Inner
 :- *(2) Sort [imei#0 ASC NULLS FIRST], false, 0
 :  +- Exchange(coordinator id: 973215530) hashpartitioning(imei#0, 
200), coordinator[target post-shuffle partition size: 67108864]
 : +- *(1) Scan JDBCRelation(device_loc) [numPartitions=1] 
[imei#0] PushedFilters: [*IsNotNull(imei)], ReadSchema: struct
 +- *(4) Sort [imei#27 ASC NULLS FIRST], false, 0
+- ReusedExchange [imei#27], Exchange(coordinator id: 
973215530) hashpartitioning(imei#0, 200), coordinator[target post-shuffle 
partition size: 67108864]
```
`ExchangeCoordinator` determines how we shuffle data between stages, so if 
totally-unrelated stages share an exchange, IIUC the share easily breaks the 
coordinator semantics. My hunch is that, to support the reuse an exchange with 
a coordinator, it needs more logics in `ExchangeCoordinator` to take the share 
into consideration.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-18 Thread markhamstra
Github user markhamstra commented on a diff in the pull request:

https://github.com/apache/spark/pull/21754#discussion_r203416454
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -85,14 +85,20 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
  */
 case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
 
+  private def supportReuseExchange(exchange: Exchange): Boolean = exchange 
match {
+// If a coordinator defined in an exchange operator, the exchange 
cannot be reused
--- End diff --

This seems overstated if this comment in the JIRA description is correct: 
"When the cache tabel device_loc is executed before this query is executed, 
everything is fine". In fact, if Xiao Li is correct in that statement, then 
this PR is eliminating a useful optimization in cases where it doesn't need to 
-- i.e. it is preventing Exchange reuse any time adaptive execution is used 
instead of only preventing reuse when it will actually cause a problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21754: [SPARK-24705][SQL] Cannot reuse an exchange opera...

2018-07-11 Thread maropu
GitHub user maropu opened a pull request:

https://github.com/apache/spark/pull/21754

[SPARK-24705][SQL] Cannot reuse an exchange operator with an adaptive 
execution coordinator

## What changes were proposed in this pull request?
This pr fixed a bug to wrongly reuse an exchange operator with an adaptive 
execution coordinator. If the coordinator defined, the exchange has an 
independent shuffle strategy. Therefore, `ReuseExchange` cannot regard the 
exchange as being reusable. This pr modified code to filter out these exchanges 
in `ReuseExchange`.

## How was this patch tested?
Added tests in `ExchangeCoordinatorSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maropu/spark SPARK-24705-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21754.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21754


commit 32c4cb2881a791f459bb040eebc41a54d0d54384
Author: Takeshi Yamamuro 
Date:   2018-07-12T02:35:12Z

Fix




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org