This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 79e3d0d  [SPARK-35855][SQL] Unify reuse map data structures in non-AQE 
and AQE rules
79e3d0d is described below

commit 79e3d0d98f884dd1f87ad385c682ba380a60dbc8
Author: Peter Toth <peter.t...@gmail.com>
AuthorDate: Wed Jun 23 07:20:47 2021 +0000

    [SPARK-35855][SQL] Unify reuse map data structures in non-AQE and AQE rules
    
    ### What changes were proposed in this pull request?
    This PR unifies reuse map data structures in non-AQE and AQE rules to a 
simple `Map[<canonicalized plan>, <plan>]` based on the discussion here: 
https://github.com/apache/spark/pull/28885#discussion_r655073897
    
    ### Why are the changes needed?
    The proposed `Map[<canonicalized plan>, <plan>]` is simpler than the 
currently used `Map[<schema>, ArrayBuffer[<plan>]]` in 
`ReuseMap`/`ReuseExchangeAndSubquery` (non-AQE) and consistent with the 
`ReuseAdaptiveSubquery` (AQE) subquery reuse rule.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs.
    
    Closes #33021 from peter-toth/SPARK-35855-unify-reuse-map-data-structures.
    
    Authored-by: Peter Toth <peter.t...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/util/ReuseMap.scala | 73 ----------------------
 .../org/apache/spark/sql/util/ReuseMapSuite.scala  | 73 ----------------------
 .../execution/reuse/ReuseExchangeAndSubquery.scala | 29 ++++++---
 .../sql/execution/joins/BroadcastJoinSuite.scala   |  4 +-
 4 files changed, 22 insertions(+), 157 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ReuseMap.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ReuseMap.scala
deleted file mode 100644
index fbee4f0..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/util/ReuseMap.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util
-
-import scala.collection.mutable.{ArrayBuffer, Map}
-
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.types.StructType
-
-/**
- * Map of canonicalized plans that can be used to find reuse possibilities.
- *
- * To avoid costly canonicalization of a plan:
- * - we use its schema first to check if it can be replaced to a reused one at 
all
- * - we insert it into the map of canonicalized plans only when at least 2 
have the same schema
- *
- * @tparam T the type of the node we want to reuse
- * @tparam T2 the type of the canonicalized node
- */
-class ReuseMap[T <: T2, T2 <: QueryPlan[T2]] {
-  private val map = Map[StructType, ArrayBuffer[T]]()
-
-  /**
-   * Find a matching plan with the same canonicalized form in the map or add 
the new plan to the
-   * map otherwise.
-   *
-   * @param plan the input plan
-   * @return the matching plan or the input plan
-   */
-  private def lookupOrElseAdd(plan: T): T = {
-    val sameSchema = map.getOrElseUpdate(plan.schema, ArrayBuffer())
-    val samePlan = sameSchema.find(plan.sameResult)
-    if (samePlan.isDefined) {
-      samePlan.get
-    } else {
-      sameSchema += plan
-      plan
-    }
-  }
-
-  /**
-   * Find a matching plan with the same canonicalized form in the map and 
apply `f` on it or add
-   * the new plan to the map otherwise.
-   *
-   * @param plan the input plan
-   * @param f the function to apply
-   * @tparam T2 the type of the reuse node
-   * @return the matching plan with `f` applied or the input plan
-   */
-  def reuseOrElseAdd[T2 >: T](plan: T, f: T => T2): T2 = {
-    val found = lookupOrElseAdd(plan)
-    if (found eq plan) {
-      plan
-    } else {
-      f(found)
-    }
-  }
-}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ReuseMapSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ReuseMapSuite.scala
deleted file mode 100644
index 6a74aa4..0000000
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/ReuseMapSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.util
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
-import org.apache.spark.sql.types.IntegerType
-
-case class TestNode(children: Seq[TestNode], output: Seq[Attribute]) extends 
LogicalPlan {
-  override protected def withNewChildrenInternal(
-      newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = copy(children = 
children)
-}
-case class TestReuseNode(child: LogicalPlan) extends UnaryNode {
-  override def output: Seq[Attribute] = child.output
-
-  override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
-    copy(child = newChild)
-}
-
-class ReuseMapSuite extends SparkFunSuite {
-  private val leafNode1 = TestNode(Nil, Seq(AttributeReference("a", 
IntegerType)()))
-  private val leafNode2 = TestNode(Nil, Seq(AttributeReference("b", 
IntegerType)()))
-  private val parentNode1 = TestNode(Seq(leafNode1), 
Seq(AttributeReference("a", IntegerType)()))
-  private val parentNode2 = TestNode(Seq(leafNode2), 
Seq(AttributeReference("b", IntegerType)()))
-
-  private def reuse(testNode: TestNode) = TestReuseNode(testNode)
-
-  test("no reuse if same instance") {
-    val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
-
-    reuseMap.reuseOrElseAdd(leafNode1, reuse)
-    reuseMap.reuseOrElseAdd(parentNode1, reuse)
-
-    assert(reuseMap.reuseOrElseAdd(leafNode1, reuse) == leafNode1)
-    assert(reuseMap.reuseOrElseAdd(parentNode1, reuse) == parentNode1)
-  }
-
-  test("reuse if different instance with same canonicalized plan") {
-    val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
-    reuseMap.reuseOrElseAdd(leafNode1, reuse)
-    reuseMap.reuseOrElseAdd(parentNode1, reuse)
-
-    assert(reuseMap.reuseOrElseAdd(leafNode1.clone.asInstanceOf[TestNode], 
reuse) ==
-      reuse(leafNode1))
-    assert(reuseMap.reuseOrElseAdd(parentNode1.clone.asInstanceOf[TestNode], 
reuse) ==
-      reuse(parentNode1))
-  }
-
-  test("no reuse if different canonicalized plan") {
-    val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
-    reuseMap.reuseOrElseAdd(leafNode1, reuse)
-    reuseMap.reuseOrElseAdd(parentNode1, reuse)
-
-    assert(reuseMap.reuseOrElseAdd(leafNode2, reuse) == leafNode2)
-    assert(reuseMap.reuseOrElseAdd(parentNode2, reuse) == parentNode2)
-  }
-}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala
index 0de8178..471b926 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.sql.execution.reuse
 
+import scala.collection.mutable
+
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.trees.TreePattern._
 import org.apache.spark.sql.execution.{BaseSubqueryExec, 
ExecSubqueryExpression, ReusedSubqueryExec, SparkPlan}
 import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec}
-import org.apache.spark.sql.util.ReuseMap
 
 /**
  * Find out duplicated exchanges and subqueries in the whole spark plan 
including subqueries, then
@@ -36,24 +37,34 @@ case object ReuseExchangeAndSubquery extends 
Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = {
     if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
-      val exchanges = new ReuseMap[Exchange, SparkPlan]()
-      val subqueries = new ReuseMap[BaseSubqueryExec, SparkPlan]()
+      val exchanges = mutable.Map.empty[SparkPlan, Exchange]
+      val subqueries = mutable.Map.empty[SparkPlan, BaseSubqueryExec]
 
       def reuse(plan: SparkPlan): SparkPlan = {
         plan.transformUpWithPruning(_.containsAnyPattern(EXCHANGE, 
PLAN_EXPRESSION)) {
           case exchange: Exchange if conf.exchangeReuseEnabled =>
-            exchanges.reuseOrElseAdd(exchange, 
ReusedExchangeExec(exchange.output, _))
+            val cachedExchange = 
exchanges.getOrElseUpdate(exchange.canonicalized, exchange)
+            if (cachedExchange.ne(exchange)) {
+              ReusedExchangeExec(exchange.output, cachedExchange)
+            } else {
+              cachedExchange
+            }
 
           case other =>
             
other.transformExpressionsUpWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
               case sub: ExecSubqueryExpression =>
                 val subquery = reuse(sub.plan).asInstanceOf[BaseSubqueryExec]
-                sub.withNewPlan(
-                  if (conf.subqueryReuseEnabled) {
-                    subqueries.reuseOrElseAdd(subquery, ReusedSubqueryExec(_))
+                val newSubquery = if (conf.subqueryReuseEnabled) {
+                  val cachedSubquery = 
subqueries.getOrElseUpdate(subquery.canonicalized, subquery)
+                  if (cachedSubquery.ne(subquery)) {
+                    ReusedSubqueryExec(cachedSubquery)
                   } else {
-                    subquery
-                  })
+                    cachedSubquery
+                  }
+                } else {
+                  subquery
+                }
+                sub.withNewPlan(newSubquery)
             }
         }
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
index 98a1089..92c38ee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala
@@ -479,9 +479,9 @@ abstract class BroadcastJoinSuiteBase extends QueryTest 
with SQLTestUtils
   test("broadcast join where streamed side's output partitioning is 
PartitioningCollection") {
     withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "500") {
       val t1 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i1", "j1")
-      val t2 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i2", "j2")
+      val t2 = (0 until 100).map(i => (i % 5, i % 14)).toDF("i2", "j2")
       val t3 = (0 until 20).map(i => (i % 7, i % 11)).toDF("i3", "j3")
-      val t4 = (0 until 100).map(i => (i % 5, i % 13)).toDF("i4", "j4")
+      val t4 = (0 until 100).map(i => (i % 5, i % 15)).toDF("i4", "j4")
 
       // join1 is a sort merge join (shuffle on the both sides).
       val join1 = t1.join(t2, t1("i1") === t2("i2"))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to