This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a2da015c65 [SPARK-34079][SQL][FOLLOWUP] Improve the readability and 
simplify the code for MergeScalarSubqueries
5a2da015c65 is described below

commit 5a2da015c65b96446f58255916f3de06ece248b2
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Thu Nov 3 20:59:04 2022 +0800

    [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code 
for MergeScalarSubqueries
    
    ### What changes were proposed in this pull request?
    Recently, I read the `MergeScalarSubqueries` because it is a feature used 
for improve performance.
    I fount the parameters of ScalarSubqueryReference is hard to understand, so 
I want add some comments on it.
    
    Additionally, the private method `supportedAggregateMerge` of 
`MergeScalarSubqueries` looks redundant, this PR wants simplify the code.
    
    ### Why are the changes needed?
    Improve the readability and simplify the code for `MergeScalarSubqueries`.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    Just improve the readability and simplify the code for 
`MergeScalarSubqueries`.
    
    ### How was this patch tested?
    Exists tests.
    
    Closes #38461 from beliefer/SPARK-34079_followup.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../catalyst/optimizer/MergeScalarSubqueries.scala | 31 +++++++++++-----------
 1 file changed, 16 insertions(+), 15 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
index 1cb3f3f157c..43be160f08f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala
@@ -346,22 +346,19 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
   // Only allow aggregates of the same implementation because merging 
different implementations
   // could cause performance regression.
   private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: 
Aggregate) = {
-    val newPlanAggregateExpressions = 
newPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val cachedPlanAggregateExpressions = 
cachedPlan.aggregateExpressions.flatMap(_.collect {
-      case a: AggregateExpression => a
-    })
-    val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      
newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
-    val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate(
-      
cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
+    val aggregateExpressionsSeq = Seq(newPlan, cachedPlan).map { plan =>
+      plan.aggregateExpressions.flatMap(_.collect {
+        case a: AggregateExpression => a
+      })
+    }
+    val Seq(newPlanSupportsHashAggregate, cachedPlanSupportsHashAggregate) =
+      aggregateExpressionsSeq.map(aggregateExpressions => 
Aggregate.supportsHashAggregate(
+        aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)))
     newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate ||
       newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && {
-        val newPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(newPlanAggregateExpressions)
-        val cachedPlanSupportsObjectHashAggregate =
-          Aggregate.supportsObjectHashAggregate(cachedPlanAggregateExpressions)
+        val Seq(newPlanSupportsObjectHashAggregate, 
cachedPlanSupportsObjectHashAggregate) =
+          aggregateExpressionsSeq.map(aggregateExpressions =>
+            Aggregate.supportsObjectHashAggregate(aggregateExpressions))
         newPlanSupportsObjectHashAggregate && 
cachedPlanSupportsObjectHashAggregate ||
           newPlanSupportsObjectHashAggregate == 
cachedPlanSupportsObjectHashAggregate
       }
@@ -394,7 +391,11 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] {
 }
 
 /**
- * Temporal reference to a subquery.
+ * Temporal reference to a cached subquery.
+ *
+ * @param subqueryIndex A subquery index in the cache.
+ * @param headerIndex An index in the output of merged subquery.
+ * @param dataType The dataType of origin scalar subquery.
  */
 case class ScalarSubqueryReference(
     subqueryIndex: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to