peter-toth commented on a change in pull request #28885: URL: https://github.com/apache/spark/pull/28885#discussion_r459255589
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ########## @@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan |""".stripMargin } } - -/** - * Find out duplicated exchanges in the spark plan, then use the same exchange for all the - * references. - */ -case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { - - def apply(plan: SparkPlan): SparkPlan = { - if (!conf.exchangeReuseEnabled) { - return plan - } - // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. - val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() - - // Replace a Exchange duplicate with a ReusedExchange - def reuse: PartialFunction[Exchange, SparkPlan] = { - case exchange: Exchange => - val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) - val samePlan = sameSchema.find { e => - exchange.sameResult(e) - } - if (samePlan.isDefined) { - // Keep the output of this exchange, the following plans require that to resolve - // attributes. - ReusedExchangeExec(exchange.output, samePlan.get) - } else { - sameSchema += exchange - exchange - } - } - - plan transformUp { - case exchange: Exchange => reuse(exchange) - } transformAllExpressions { - // Lookup inside subqueries for duplicate exchanges - case in: InSubqueryExec => - val newIn = in.plan.transformUp { - case exchange: Exchange => reuse(exchange) - } - in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec]) - } - } Review comment: This is the 1. point in the description and tested by the case `SPARK-32041: No reuse interference inside ReuseExchange` in the new `ReuseExchangeAndSubquerySuite`: https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR28 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org