viirya commented on a change in pull request #29360: URL: https://github.com/apache/spark/pull/29360#discussion_r467601888
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ########## @@ -1823,3 +1824,32 @@ object OptimizeLimitZero extends Rule[LogicalPlan] { empty(ll) } } + +/** + * Split [[Expand]] into several Expand if the projection size of Expand is larger + * than default projection size. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { Review comment: I'm curious about how this reduces shuffled data. I think the original idea is, in the cases of Expand will produce huge data, splitting it to small ones. But shuffle is happened during Aggregate here, right? By splitting, the total amount of shuffled data is not changed, but split into several ones. Does it really result significant improvement? I'm curious to look at the benchmark results. ########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala ########## @@ -1823,3 +1824,32 @@ object OptimizeLimitZero extends Rule[LogicalPlan] { empty(ll) } } + +/** + * Split [[Expand]] into several Expand if the projection size of Expand is larger + * than default projection size. + */ +object SplitAggregateWithExpand extends Rule[LogicalPlan] { + private def splitExpand(expand: Expand, num: Int): Seq[Expand] = { + val groupedProjections = expand.projections.grouped(num).toList + val expands: Seq[Expand] = groupedProjections.map { + projectionSeq => Expand(projectionSeq, expand.output, expand.child) + } + expands + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case a @ Aggregate(_, _, e @ Expand(projections, _, _)) => + if (SQLConf.get.groupingWithUnion && projections.length + > SQLConf.get.groupingExpandProjections) { + val num = SQLConf.get.groupingExpandProjections + val subExpands = splitExpand(e, num) + val aggregates: Seq[Aggregate] = subExpands.map { expand => + Aggregate(a.groupingExpressions, a.aggregateExpressions, expand) + } Review comment: These split expand + aggregate read same data. That's said this optimization will read same data multiple times. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org