viirya commented on a change in pull request #29360:
URL: https://github.com/apache/spark/pull/29360#discussion_r467601888



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1823,3 +1824,32 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
       empty(ll)
   }
 }
+
+/**
+ * Split [[Expand]] into several Expand if the projection size of Expand is 
larger
+ * than default projection size.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {

Review comment:
       I'm curious about how this reduces shuffled data. I think the original 
idea is, in the cases of Expand will produce huge data, splitting it to small 
ones.
   
   But shuffle is happened during Aggregate here, right? By splitting, the 
total amount of shuffled data is not changed, but split into several ones. Does 
it really result significant improvement?
   
   I'm curious to look at the benchmark results.

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1823,3 +1824,32 @@ object OptimizeLimitZero extends Rule[LogicalPlan] {
       empty(ll)
   }
 }
+
+/**
+ * Split [[Expand]] into several Expand if the projection size of Expand is 
larger
+ * than default projection size.
+ */
+object SplitAggregateWithExpand extends Rule[LogicalPlan] {
+  private def splitExpand(expand: Expand, num: Int): Seq[Expand] = {
+    val groupedProjections = expand.projections.grouped(num).toList
+    val expands: Seq[Expand] = groupedProjections.map {
+      projectionSeq => Expand(projectionSeq, expand.output, expand.child)
+    }
+    expands
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case a @ Aggregate(_, _, e @ Expand(projections, _, _)) =>
+      if (SQLConf.get.groupingWithUnion && projections.length
+        > SQLConf.get.groupingExpandProjections) {
+        val num = SQLConf.get.groupingExpandProjections
+        val subExpands = splitExpand(e, num)
+        val aggregates: Seq[Aggregate] = subExpands.map { expand =>
+          Aggregate(a.groupingExpressions, a.aggregateExpressions, expand)
+        }

Review comment:
       These split expand + aggregate read same data. That's said this 
optimization will read same data multiple times. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to