Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21754#discussion_r205929427 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -89,23 +97,42 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { if (!conf.exchangeReuseEnabled) { return plan } + // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() + + def tryReuseExchange(exchange: Exchange, filterCondition: Exchange => Boolean): SparkPlan = { + // the exchanges that have same results usually also have same schemas (same column names). + val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) + val samePlan = sameSchema.filter(filterCondition).find { e => + exchange.sameResult(e) + } + if (samePlan.isDefined) { + // Keep the output of this exchange, the following plans require that to resolve + // attributes. + ReusedExchangeExec(exchange.output, samePlan.get) + } else { + sameSchema += exchange + exchange + } + } + plan.transformUp { + // For coordinated exchange + case exchange @ ShuffleExchangeExec(_, _, Some(coordinator)) => + tryReuseExchange(exchange, { + // We can reuse an exchange with the same coordinator only + case ShuffleExchangeExec(_, _, Some(c)) => coordinator == c --- End diff -- shall we just include `coordinator` in `ShuffleExchange#sameResult`?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org