[ https://issues.apache.org/jira/browse/SPARK-27573?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Josh Rosen updated SPARK-27573: ------------------------------- Description: When an aggregation requires a shuffle, Spark SQL performs separate partial and final aggregations: {code:java} sql("select id % 100 as k, id as v from range(100000)") .groupBy("k") .sum("v") .explain == Physical Plan == *(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)]) +- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340), coordinator[target post-shuffle partition size: 67108864] +- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)]) +- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L] +- *(1) Range (0, 100000, step=1, splits=10) {code} However, consider what happens if the dataset being aggregated is already pre-partitioned by the aggregate's grouping columns: {code:java} sql("select id % 100 as k, id as v from range(100000)") .repartition(10, $"k") .groupBy("k") .sum("v") .explain == Physical Plan == *(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L, sum(v)#58L]) +- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)], output=[k#50L, sum#63L]) +- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10), coordinator[target post-shuffle partition size: 67108864] +- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L] +- *(1) Range (0, 100000, step=1, splits=10) {code} Here, we end up with back-to-back HashAggregate operators which are performed as part of the same stage. For certain aggregates (e.g. _sum_, _count_), this duplication is unnecessary: we could have just performed a total aggregation instead (since we already have all of the data co-located)! The duplicate aggregate is problematic in cases where the aggregate inputs and outputs are the same order of magnitude (e.g.counting the number of duplicate records in a dataset where duplicates are extremely rare). My motivation for this optimization is similar to SPARK-1412: I know that partial aggregation doesn't help for my workload, so I wanted to somehow coerce Spark into skipping the ineffective partial aggregation and jumping directly to total aggregation. I thought that pre-partitioning would accomplish this, but doing so didn't achieve my goal due to the missing aggregation-collapsing optimization. was: When an aggregation requires a shuffle, Spark SQL performs separate partial and final aggregations: {code:java} sql("select id % 100 as k, id as v from range(100000)") .groupBy("k") .sum("v") .explain == Physical Plan == *(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)]) +- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340), coordinator[target post-shuffle partition size: 67108864] +- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)]) +- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L] +- *(1) Range (0, 100000, step=1, splits=10) {code} However, consider what happens if the dataset being aggregated is already pre-partitioned by the aggregate's grouping columns: {code:java} sql("select id % 100 as k, id as v from range(100000)") .repartition(10, $"k") .groupBy("k") .sum("v") .explain == Physical Plan == *(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L, sum(v)#58L]) +- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)], output=[k#50L, sum#63L]) +- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10), coordinator[target post-shuffle partition size: 67108864] +- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L] +- *(1) Range (0, 100000, step=1, splits=10) {code} Here, we end up with back-to-back HashAggregate operators which are performed as part of the same stage. For certain aggregates (e.g. _sum_, _count_), this duplication is unnecessary: we could have just performed a total aggregation instead (since we already have all of the data co-located)! The duplicate aggregate is problematic in cases where the aggregate inputs and outputs are the same order of magnitude (e.g.counting the number of duplicate records in a dataset where duplicates are extremely rare). My motivation for this optimization is similar to SPARK-1412: I know that partial aggregation doesn't help for my workload, so I want to somehow coerce Spark into skipping the ineffective partial aggregation and jumping directly to total aggregation. > Collapse adjacent physical aggregate operators when possible > ------------------------------------------------------------ > > Key: SPARK-27573 > URL: https://issues.apache.org/jira/browse/SPARK-27573 > Project: Spark > Issue Type: Improvement > Components: Optimizer, SQL > Affects Versions: 2.4.0 > Reporter: Josh Rosen > Priority: Major > > When an aggregation requires a shuffle, Spark SQL performs separate partial > and final aggregations: > {code:java} > sql("select id % 100 as k, id as v from range(100000)") > .groupBy("k") > .sum("v") > .explain > == Physical Plan == > *(2) HashAggregate(keys=[k#64L], functions=[sum(v#65L)]) > +- Exchange(coordinator id: 2031684357) hashpartitioning(k#64L, 5340), > coordinator[target post-shuffle partition size: 67108864] > +- *(1) HashAggregate(keys=[k#64L], functions=[partial_sum(v#65L)]) > +- *(1) Project [(id#66L % 100) AS k#64L, id#66L AS v#65L] > +- *(1) Range (0, 100000, step=1, splits=10) > {code} > However, consider what happens if the dataset being aggregated is already > pre-partitioned by the aggregate's grouping columns: > {code:java} > sql("select id % 100 as k, id as v from range(100000)") > .repartition(10, $"k") > .groupBy("k") > .sum("v") > .explain > == Physical Plan == > *(2) HashAggregate(keys=[k#50L], functions=[sum(v#51L)], output=[k#50L, > sum(v)#58L]) > +- *(2) HashAggregate(keys=[k#50L], functions=[partial_sum(v#51L)], > output=[k#50L, sum#63L]) > +- Exchange(coordinator id: 39015877) hashpartitioning(k#50L, 10), > coordinator[target post-shuffle partition size: 67108864] > +- *(1) Project [(id#52L % 100) AS k#50L, id#52L AS v#51L] > +- *(1) Range (0, 100000, step=1, splits=10) > {code} > Here, we end up with back-to-back HashAggregate operators which are performed > as part of the same stage. > For certain aggregates (e.g. _sum_, _count_), this duplication is > unnecessary: we could have just performed a total aggregation instead (since > we already have all of the data co-located)! > The duplicate aggregate is problematic in cases where the aggregate inputs > and outputs are the same order of magnitude (e.g.counting the number of > duplicate records in a dataset where duplicates are extremely rare). > My motivation for this optimization is similar to SPARK-1412: I know that > partial aggregation doesn't help for my workload, so I wanted to somehow > coerce Spark into skipping the ineffective partial aggregation and jumping > directly to total aggregation. I thought that pre-partitioning would > accomplish this, but doing so didn't achieve my goal due to the missing > aggregation-collapsing optimization. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org