Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21754#discussion_r205929427
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
    @@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends 
Rule[SparkPlan] {
         if (!conf.exchangeReuseEnabled) {
           return plan
         }
    +
         // Build a hash map using schema of exchanges to avoid O(N*N) 
sameResult calls.
         val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
    +
    +    def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => 
Boolean): SparkPlan = {
    +      // the exchanges that have same results usually also have same 
schemas (same column names).
    +      val sameSchema = exchanges.getOrElseUpdate(exchange.schema, 
ArrayBuffer[Exchange]())
    +      val samePlan = sameSchema.filter(filterCondition).find { e =>
    +        exchange.sameResult(e)
    +      }
    +      if (samePlan.isDefined) {
    +        // Keep the output of this exchange, the following plans require 
that to resolve
    +        // attributes.
    +        ReusedExchangeExec(exchange.output, samePlan.get)
    +      } else {
    +        sameSchema += exchange
    +        exchange
    +      }
    +    }
    +
         plan.transformUp {
    +      // For coordinated exchange
    +      case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) =>
    +        tryReuseExchange(exchange, {
    +          // We can reuse an exchange with the same coordinator only
    +          case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c
    --- End diff --
    
    shall we just include `coordinator` in `ShuffleExchange#sameResult`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to