This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f24f8489f80 [SPARK-41106][SQL] Reduce collection conversion when 
create AttributeMap
f24f8489f80 is described below

commit f24f8489f8096c5324a6e7084437ee2238311103
Author: YangJie <yangji...@baidu.com>
AuthorDate: Thu Nov 17 10:53:04 2022 -0600

    [SPARK-41106][SQL] Reduce collection conversion when create AttributeMap
    
    ### What changes were proposed in this pull request?
    This pr aims to reduce collection conversion when create AttributeMap as 
following ways:
    
    1. Add a new `apply` method to `AttributeMap`
    
    ```
      def apply[A](kvs: Iterable[(Attribute, A)]): AttributeMap[A] = {
        new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
      }
    ```
    
    and use it in applicable scenarios to avoid additional collection 
conversion.
    
    Although the new `apply` method is more generic, I did not delete the old 
ones for forward compatibility.
    
    2. For the following 2 scenarios, `leftStats.attributeStats ++ 
rightStats.attributeStats` is `AttributeMap ++ AttributeMap`, will return a new 
`AttributeMap`, so this pr remove the redundant collection conversion.
    
    
https://github.com/apache/spark/blob/7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala#L86
    
    
https://github.com/apache/spark/blob/7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala#L148
    
    3. For the following scenario, `attributePercentiles` is a `Map` and there 
is a corresponding `apply` method can accept `Map` input, so remove the 
redundant `toSeq` in this pr
    
    
https://github.com/apache/spark/blob/7d320d784a2d637fd1a8fd0798da3d2a39b4d7cd/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L323
    
    ### Why are the changes needed?
    Minor performance improvements, reducing collection conversion
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    - Pass GitHub Actions
    - Manual test
    ```
    dev/change-scala-version.sh 2.13
    build/mvn clean install -Phadoop-3 -Phadoop-cloud -Pmesos -Pyarn 
-Pkinesis-asl -Phive-thriftserver -Pspark-ganglia-lgpl -Pkubernetes -Phive  
-Pscala-2.13 -fn
    ```
    
    All Test passed
    
    Closes #38610 from LuciferYang/AttributeMap.
    
    Lead-authored-by: YangJie <yangji...@baidu.com>
    Co-authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../org/apache/spark/sql/catalyst/expressions/AttributeMap.scala  | 4 ++++
 .../org/apache/spark/sql/catalyst/expressions/AttributeMap.scala  | 4 ++++
 .../org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala     | 4 ++--
 .../spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala      | 2 +-
 .../spark/sql/catalyst/optimizer/NestedColumnAliasing.scala       | 2 +-
 .../scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala | 2 +-
 .../org/apache/spark/sql/catalyst/optimizer/expressions.scala     | 4 ++--
 .../scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala  | 4 ++--
 .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala     | 2 +-
 .../catalyst/plans/logical/statsEstimation/FilterEstimation.scala | 2 +-
 .../catalyst/plans/logical/statsEstimation/JoinEstimation.scala   | 8 +++-----
 .../apache/spark/sql/execution/columnar/InMemoryRelation.scala    | 2 +-
 .../org/apache/spark/sql/execution/command/CommandUtils.scala     | 2 +-
 13 files changed, 24 insertions(+), 18 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
 
b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
index 3a424574b97..c55c542d957 100644
--- 
a/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
+++ 
b/sql/catalyst/src/main/scala-2.12/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
@@ -31,6 +31,10 @@ object AttributeMap {
     new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
   }
 
+  def apply[A](kvs: Iterable[(Attribute, A)]): AttributeMap[A] = {
+    new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
+  }
+
   def empty[A]: AttributeMap[A] = new AttributeMap(Map.empty)
 }
 
diff --git 
a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
 
b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
index 1f1df2d2e1d..3d5d6471d26 100644
--- 
a/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
+++ 
b/sql/catalyst/src/main/scala-2.13/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
@@ -31,6 +31,10 @@ object AttributeMap {
     new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
   }
 
+  def apply[A](kvs: Iterable[(Attribute, A)]): AttributeMap[A] = {
+    new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
+  }
+
   def empty[A]: AttributeMap[A] = new AttributeMap(Map.empty)
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 2217d783767..08847fbe958 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -1223,13 +1223,13 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with 
Logging {
     // Check if the inputs have changed.
     val references = AttributeMap(plan.references.collect {
       case a if a.resolved => a -> a
-    }.toSeq)
+    })
     def sameButDifferent(a: Attribute): Boolean = {
       references.get(a).exists(b => b.dataType != a.dataType || b.nullable != 
a.nullable)
     }
     val inputMap = AttributeMap(plan.inputSet.collect {
       case a if a.resolved && sameButDifferent(a) => a -> a
-    }.toSeq)
+    })
     if (inputMap.isEmpty) {
       // Nothing changed.
       plan
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index 53c09a3f68d..60bfc6873e6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -209,7 +209,7 @@ object DecorrelateInnerQuery extends PredicateHelper {
     if (duplicates.nonEmpty) {
       val aliasMap = AttributeMap(duplicates.map { dup =>
         dup -> Alias(dup, dup.toString)()
-      }.toSeq)
+      })
       val aliasedExpressions = innerPlan.output.map { ref =>
         aliasMap.getOrElse(ref, ref)
       }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 977e9b1ab13..579afa0439a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -150,7 +150,7 @@ object NestedColumnAliasing {
 
     // A reference attribute can have multiple aliases for nested fields.
     val attrToAliases =
-      
AttributeMap(attributeToExtractValuesAndAliases.mapValues(_.map(_._2)).toSeq)
+      AttributeMap(attributeToExtractValuesAndAliases.mapValues(_.map(_._2)))
 
     plan match {
       case Project(projectList, child) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 2bef03d633a..ecb93f6b239 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -587,7 +587,7 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] {
           newChild
         }
 
-        val mapping = AttributeMap(currentNextAttrPairs.toSeq)
+        val mapping = AttributeMap(currentNextAttrPairs)
 
         // Create a an expression cleaning function for nodes that can 
actually produce redundant
         // aliases, use identity otherwise.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
index 4542ecb95fb..a59fe36ef42 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala
@@ -954,7 +954,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
       case j: Join =>
         val (newChildren, foldableMaps) = 
j.children.map(propagateFoldables).unzip
         val foldableMap = AttributeMap(
-          foldableMaps.foldLeft(Iterable.empty[(Attribute, Alias)])(_ ++ 
_.baseMap.values).toSeq)
+          foldableMaps.foldLeft(Iterable.empty[(Attribute, Alias)])(_ ++ 
_.baseMap.values))
         val newJoin =
           replaceFoldable(j.withNewChildren(newChildren).asInstanceOf[Join], 
foldableMap)
         val missDerivedAttrsSet: AttributeSet = AttributeSet(newJoin.joinType 
match {
@@ -966,7 +966,7 @@ object FoldablePropagation extends Rule[LogicalPlan] {
         })
         val newFoldableMap = AttributeMap(foldableMap.baseMap.values.filterNot 
{
           case (attr, _) => missDerivedAttrsSet.contains(attr)
-        }.toSeq)
+        })
         (newJoin, newFoldableMap)
 
       // For other plans, they are not safe to apply foldable propagation, and 
they should not
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 37331362efa..52619b38098 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -85,7 +85,7 @@ object RewritePredicateSubquery extends Rule[LogicalPlan] 
with PredicateHelper {
       }
       val rewrites = AttributeMap(duplicates.map { dup =>
         dup -> Alias(dup, dup.toString)()
-      }.toSeq)
+      })
       val aliasedExpressions = subplan.output.map { ref =>
         rewrites.getOrElse(ref, ref)
       }
@@ -644,7 +644,7 @@ object RewriteCorrelatedScalarSubquery extends 
Rule[LogicalPlan] with AliasHelpe
           }
         }
     }
-    (newChild, AttributeMap(subqueryAttrMapping.toSeq))
+    (newChild, AttributeMap(subqueryAttrMapping))
   }
 
   private def updateAttrs[E <: Expression](
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index cc62c81b101..0942919b176 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -301,7 +301,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
             .exists(_._2.map(_._2.exprId).distinct.length > 1),
             "Found duplicate rewrite attributes")
 
-          val attributeRewrites = AttributeMap(attrMappingForCurrentPlan.toSeq)
+          val attributeRewrites = AttributeMap(attrMappingForCurrentPlan)
           // Using attrMapping from the children plans to rewrite their parent 
node.
           // Note that we shouldn't rewrite a node using attrMapping from its 
sibling nodes.
           newPlan = newPlan.rewriteAttrs(attributeRewrites)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 840a475e67a..24d64399a2a 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -922,6 +922,6 @@ case class ColumnStatsMap(originalMap: 
AttributeMap[ColumnStat]) {
       attr -> oriColStat.updateCountStats(
         rowsBeforeFilter, rowsAfterFilter, 
updatedMap.get(attr.exprId).map(_._2))
     }
-    AttributeMap(newColumnStats.toSeq)
+    AttributeMap(newColumnStats)
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index f2b74904f91..30f10970d1a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -83,8 +83,7 @@ case class JoinEstimation(join: Join) extends Logging {
       }
 
       // 3. Update statistics based on the output of join
-      val inputAttrStats = AttributeMap(
-        leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq)
+      val inputAttrStats = leftStats.attributeStats ++ 
rightStats.attributeStats
       val attributesWithStat = join.output.filter(a =>
         inputAttrStats.get(a).map(_.hasCountStats).getOrElse(false))
       val (fromLeft, fromRight) = 
attributesWithStat.partition(join.left.outputSet.contains(_))
@@ -145,8 +144,7 @@ case class JoinEstimation(join: Join) extends Logging {
 
     case _ =>
       // When there is no equi-join condition, we do estimation like cartesian 
product.
-      val inputAttrStats = AttributeMap(
-        leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq)
+      val inputAttrStats = leftStats.attributeStats ++ 
rightStats.attributeStats
       // Propagate the original column stats
       val outputRows = leftStats.rowCount.get * rightStats.rowCount.get
       Some(Statistics(
@@ -213,7 +211,7 @@ case class JoinEstimation(join: Join) extends Logging {
       }
       i += 1
     }
-    (joinCard, AttributeMap(keyStatsAfterJoin.toSeq))
+    (joinCard, AttributeMap(keyStatsAfterJoin))
   }
 
   /** Returns join cardinality and the column stat for this pair of join keys. 
*/
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 0ace24777b7..98f4a164a22 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -394,7 +394,7 @@ case class InMemoryRelation(
       newColStats: Map[Attribute, ColumnStat]): Unit = this.synchronized {
     val newStats = statsOfPlanToCache.copy(
       rowCount = Some(rowCount),
-      attributeStats = AttributeMap((statsOfPlanToCache.attributeStats ++ 
newColStats).toSeq)
+      attributeStats = AttributeMap(statsOfPlanToCache.attributeStats ++ 
newColStats)
     )
     statsOfPlanToCache = newStats
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 6883f93523b..d847868c0ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -320,7 +320,7 @@ object CommandUtils extends Logging {
         }
       }
     }
-    AttributeMap(attributePercentiles.toSeq)
+    AttributeMap(attributePercentiles)
   }
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to