Josh Rosen created SPARK-27573:
----------------------------------

             Summary: Collapse adjacent aggregate physical operators when 
possible
                 Key: SPARK-27573
                 URL: https://issues.apache.org/jira/browse/SPARK-27573
             Project: Spark
          Issue Type: Improvement
          Components: Optimizer, SQL
    Affects Versions: 2.4.0
            Reporter: Josh Rosen


When an aggregation requires a shuffle, Spark SQL performs separate partial and 
final aggregations:
{code:java}
sql("select id % 100 as k, id as v from range(100000)")
  .groupBy("k")
  .sum("v")
  .explain

== Physical Plan ==
*(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)])
+- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340), 
coordinator[target post-shuffle partition size: 67108864]
   +- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)])
      +- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L]
         +- *(1) Range (0, 100000, step=1, splits=10)
{code}
However, consider what happens if the dataset being aggregated is already 
pre-partitioned by the aggregate's grouping columns:
{code:java}
sql("select id % 100 as k, id as v from range(100000)")
  .repartition(10, $"k")
  .groupBy("k")
  .sum("v")
  .explain

== Physical Plan ==
*(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L, 
sum(v)#58L])
+- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)], 
output=[k#50L, sum#63L])
   +- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10), 
coordinator[target post-shuffle partition size: 67108864]
      +- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L]
         +- *(1) Range (0, 100000, step=1, splits=10) 
{code}
Here, we end up with back-to-back HashAggregate operators which are performed 
as part of the same stage.

For certain aggregates (e.g. _sum_, _count_), this duplication is unnecessary: 
we could have just performed a total aggregation instead!

The duplicate aggregate is problematic in cases where the aggregate inputs and 
outputs are the same order of magnitude (e.g.counting the number of duplicate 
records in a dataset where duplicates are extremely rare).

My motivation for this optimization is similar to SPARK-1412: I know that 
partial aggregation doesn't help for my workload, so I want to somehow coerce 
Spark into skipping the ineffective partial aggregation and jumping directly to 
total aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to