peter-toth commented on a change in pull request #28885: URL: https://github.com/apache/spark/pull/28885#discussion_r459255589
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala ########## @@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output: Seq[Attribute], child: Exchan |""".stripMargin } } - -/** - * Find out duplicated exchanges in the spark plan, then use the same exchange for all the - * references. - */ -case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] { - - def apply(plan: SparkPlan): SparkPlan = { - if (!conf.exchangeReuseEnabled) { - return plan - } - // Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. - val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() - - // Replace a Exchange duplicate with a ReusedExchange - def reuse: PartialFunction[Exchange, SparkPlan] = { - case exchange: Exchange => - val sameSchema = exchanges.getOrElseUpdate(exchange.schema, ArrayBuffer[Exchange]()) - val samePlan = sameSchema.find { e => - exchange.sameResult(e) - } - if (samePlan.isDefined) { - // Keep the output of this exchange, the following plans require that to resolve - // attributes. - ReusedExchangeExec(exchange.output, samePlan.get) - } else { - sameSchema += exchange - exchange - } - } - - plan transformUp { - case exchange: Exchange => reuse(exchange) - } transformAllExpressions { - // Lookup inside subqueries for duplicate exchanges - case in: InSubqueryExec => - val newIn = in.plan.transformUp { - case exchange: Exchange => reuse(exchange) - } - in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec]) - } - } Review comment: This is the issue described in 1. in the PR description and tested with the case `SPARK-32041: No reuse interference inside ReuseExchange` in the new `ReuseExchangeAndSubquerySuite`: https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR28 Combining the 2 rules are required to fix 2. and tested with the case `SPARK-32041: No reuse interference between ReuseExchange and ReuseSubquery`: https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR67 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org