This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5a2da015c65 [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries 5a2da015c65 is described below commit 5a2da015c65b96446f58255916f3de06ece248b2 Author: Jiaan Geng <belie...@163.com> AuthorDate: Thu Nov 3 20:59:04 2022 +0800 [SPARK-34079][SQL][FOLLOWUP] Improve the readability and simplify the code for MergeScalarSubqueries ### What changes were proposed in this pull request? Recently, I read the `MergeScalarSubqueries` because it is a feature used for improve performance. I fount the parameters of ScalarSubqueryReference is hard to understand, so I want add some comments on it. Additionally, the private method `supportedAggregateMerge` of `MergeScalarSubqueries` looks redundant, this PR wants simplify the code. ### Why are the changes needed? Improve the readability and simplify the code for `MergeScalarSubqueries`. ### Does this PR introduce _any_ user-facing change? 'No'. Just improve the readability and simplify the code for `MergeScalarSubqueries`. ### How was this patch tested? Exists tests. Closes #38461 from beliefer/SPARK-34079_followup. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../catalyst/optimizer/MergeScalarSubqueries.scala | 31 +++++++++++----------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala index 1cb3f3f157c..43be160f08f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/MergeScalarSubqueries.scala @@ -346,22 +346,19 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { // Only allow aggregates of the same implementation because merging different implementations // could cause performance regression. private def supportedAggregateMerge(newPlan: Aggregate, cachedPlan: Aggregate) = { - val newPlanAggregateExpressions = newPlan.aggregateExpressions.flatMap(_.collect { - case a: AggregateExpression => a - }) - val cachedPlanAggregateExpressions = cachedPlan.aggregateExpressions.flatMap(_.collect { - case a: AggregateExpression => a - }) - val newPlanSupportsHashAggregate = Aggregate.supportsHashAggregate( - newPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) - val cachedPlanSupportsHashAggregate = Aggregate.supportsHashAggregate( - cachedPlanAggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)) + val aggregateExpressionsSeq = Seq(newPlan, cachedPlan).map { plan => + plan.aggregateExpressions.flatMap(_.collect { + case a: AggregateExpression => a + }) + } + val Seq(newPlanSupportsHashAggregate, cachedPlanSupportsHashAggregate) = + aggregateExpressionsSeq.map(aggregateExpressions => Aggregate.supportsHashAggregate( + aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))) newPlanSupportsHashAggregate && cachedPlanSupportsHashAggregate || newPlanSupportsHashAggregate == cachedPlanSupportsHashAggregate && { - val newPlanSupportsObjectHashAggregate = - Aggregate.supportsObjectHashAggregate(newPlanAggregateExpressions) - val cachedPlanSupportsObjectHashAggregate = - Aggregate.supportsObjectHashAggregate(cachedPlanAggregateExpressions) + val Seq(newPlanSupportsObjectHashAggregate, cachedPlanSupportsObjectHashAggregate) = + aggregateExpressionsSeq.map(aggregateExpressions => + Aggregate.supportsObjectHashAggregate(aggregateExpressions)) newPlanSupportsObjectHashAggregate && cachedPlanSupportsObjectHashAggregate || newPlanSupportsObjectHashAggregate == cachedPlanSupportsObjectHashAggregate } @@ -394,7 +391,11 @@ object MergeScalarSubqueries extends Rule[LogicalPlan] { } /** - * Temporal reference to a subquery. + * Temporal reference to a cached subquery. + * + * @param subqueryIndex A subquery index in the cache. + * @param headerIndex An index in the output of merged subquery. + * @param dataType The dataType of origin scalar subquery. */ case class ScalarSubqueryReference( subqueryIndex: Int, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org