peter-toth commented on a change in pull request #28885:
URL: https://github.com/apache/spark/pull/28885#discussion_r459255589



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala
##########
@@ -95,46 +89,3 @@ case class ReusedExchangeExec(override val output: 
Seq[Attribute], child: Exchan
        |""".stripMargin
   }
 }
-
-/**
- * Find out duplicated exchanges in the spark plan, then use the same exchange 
for all the
- * references.
- */
-case class ReuseExchange(conf: SQLConf) extends Rule[SparkPlan] {
-
-  def apply(plan: SparkPlan): SparkPlan = {
-    if (!conf.exchangeReuseEnabled) {
-      return plan
-    }
-    // Build a hash map using schema of exchanges to avoid O(N*N) sameResult 
calls.
-    val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]()
-
-    // Replace a Exchange duplicate with a ReusedExchange
-    def reuse: PartialFunction[Exchange, SparkPlan] = {
-      case exchange: Exchange =>
-        val sameSchema = exchanges.getOrElseUpdate(exchange.schema, 
ArrayBuffer[Exchange]())
-        val samePlan = sameSchema.find { e =>
-          exchange.sameResult(e)
-        }
-        if (samePlan.isDefined) {
-          // Keep the output of this exchange, the following plans require 
that to resolve
-          // attributes.
-          ReusedExchangeExec(exchange.output, samePlan.get)
-        } else {
-          sameSchema += exchange
-          exchange
-        }
-    }
-
-    plan transformUp {
-      case exchange: Exchange => reuse(exchange)
-    } transformAllExpressions {
-      // Lookup inside subqueries for duplicate exchanges
-      case in: InSubqueryExec =>
-        val newIn = in.plan.transformUp {
-          case exchange: Exchange => reuse(exchange)
-        }
-        in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec])
-    }
-  }

Review comment:
       This is the issue described in 1. in the PR description and tested with 
the case `SPARK-32041: No reuse interference inside ReuseExchange` in the new 
`ReuseExchangeAndSubquerySuite`: 
https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR28
   
   Combining the 2 rules are required to fix 2. and tested with the case 
`SPARK-32041: No reuse interference between ReuseExchange and ReuseSubquery`: 
https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR67




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to