Github user cenyuhai commented on a diff in the pull request: https://github.com/apache/spark/pull/19756#discussion_r151612678 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala --- @@ -109,3 +109,67 @@ case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { } } } + +/** + * Find out duplicated coordinated exchanges in the spark plan, then use the same exchange for all + * the references. + */ +case class ReuseExchangeWithCoordinator(conf: SQLConf) extends Rule[SparkPlan] { + + // Returns true if a SparkPlan has coordinated ShuffleExchangeExec children. + private def hasCoordinatedExchanges(plan: SparkPlan): Boolean = { + plan.children.nonEmpty && plan.children.forall { + case ShuffleExchangeExec(_, _, Some(_)) => true + case _ => false + } + } + + // Returns true if two sequences of exchanges are producing the same results. + private def hasExchangesWithSameResults( + source: Seq[ShuffleExchangeExec], + target: Seq[ShuffleExchangeExec]): Boolean = { + source.length == target.length && + source.zip(target).forall(x => x._1.withoutCoordinator.sameResult(x._2.withoutCoordinator)) + } + + type CoordinatorSignature = (Int, Long, Option[Int]) + + def apply(plan: SparkPlan): SparkPlan = { + if (!conf.exchangeReuseEnabled) { --- End diff -- I don't know whether spark 2.2 still has this bug or not. I am using spark 2.1
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org