[jira] [Commented] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children
[ https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17617467#comment-17617467 ] Apache Spark commented on SPARK-40382: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/38250 > Reduce projections in Expand when multiple distinct aggregations have > semantically equivalent children > -- > > Key: SPARK-40382 > URL: https://issues.apache.org/jira/browse/SPARK-40382 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Bruce Robbins >Assignee: Bruce Robbins >Priority: Major > Fix For: 3.4.0 > > > In RewriteDistinctAggregates, when grouping aggregate expressions by function > children, we should treat children that are semantically equivalent as the > same. > This proposed change potentially reduces the number of projections in the > Expand operator added to a plan. In some cases, it may eliminate the need for > an Expand operator. > Example: In the following query, the Expand operator creates 3*n rows (where > n is the number of incoming rows) because it has a projection for function > children b + 1, 1 + b and c. > {noformat} > create or replace temp view v1 as > select * from values > (1, 2, 3.0), > (1, 3, 4.0), > (2, 4, 2.5), > (2, 3, 1.0) > v1(a, b, c); > select > a, > count(distinct b + 1), > avg(distinct 1 + b) filter (where c > 0), > sum(c) > from > v1 > group by a; > {noformat} > The Expand operator has three projections (each producing a row for each > incoming row): > {noformat} > [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for > regular aggregation) > [a#87, (b#88 + 1), null, 1, null, null], <== projection #2 (for > distinct aggregation of b + 1) > [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for > distinct aggregation of 1 + b) > {noformat} > In reality, the Expand only needs one projection for 1 + b and b + 1, because > they are semantically equivalent. > With the proposed change, the Expand operator's projections look like this: > {noformat} > [a#67, null, 0, null, UnscaledValue(c#69)], <== projection #1 (for regular > aggregations) > [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]], <== projection #2 (for distinct > aggregation on b + 1 and 1 + b) > {noformat} > With one less projection, Expand produces n*2 rows instead of n*3 rows, but > still produces the correct result. > In the case where all distinct aggregates have semantically equivalent > children, the Expand operator is not needed at all. > Assume this benchmark: > {noformat} > runBenchmark("distinct aggregates") { > val N = 20 << 22 > val benchmark = new Benchmark("distinct aggregates", N, output = output) > spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1") > .createOrReplaceTempView("test") > def f1(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 1 + id1), > avg(distinct 1 + ID1) > from > test > group by k""").noop() > benchmark.addCase("all semantically equivalent", numIters = 2) { _ => > f1() > } > def f2(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 1 + id1), > avg(distinct 2 + ID1) > from > test > group by k""").noop() > benchmark.addCase("some semantically equivalent", numIters = 2) { _ => > f2() > } > def f3(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 3 + id1), > avg(distinct 2 + ID1) > from > test > group by k""").noop() > benchmark.addCase("none semantically equivalent", numIters = 2) { _ => > f3() > } > benchmark.run() > } > {noformat} > Before the change: > {noformat} > [info] distinct aggregates: Best Time(ms) Avg Time(ms) > Stdev(ms)Rate(M/s) Per Row(ns) Relative > [info] > > [info] all semantically equivalent 14721 14859 > 195 5.7 175.5 1.0X > [info] some semantically equivalent 14569 14572 > 5 5.8 173.7 1.0X > [info] none semantically equivalent 14408 14488 > 113
[jira] [Commented] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children
[ https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17617466#comment-17617466 ] Apache Spark commented on SPARK-40382: -- User 'HyukjinKwon' has created a pull request for this issue: https://github.com/apache/spark/pull/38250 > Reduce projections in Expand when multiple distinct aggregations have > semantically equivalent children > -- > > Key: SPARK-40382 > URL: https://issues.apache.org/jira/browse/SPARK-40382 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Bruce Robbins >Assignee: Bruce Robbins >Priority: Major > Fix For: 3.4.0 > > > In RewriteDistinctAggregates, when grouping aggregate expressions by function > children, we should treat children that are semantically equivalent as the > same. > This proposed change potentially reduces the number of projections in the > Expand operator added to a plan. In some cases, it may eliminate the need for > an Expand operator. > Example: In the following query, the Expand operator creates 3*n rows (where > n is the number of incoming rows) because it has a projection for function > children b + 1, 1 + b and c. > {noformat} > create or replace temp view v1 as > select * from values > (1, 2, 3.0), > (1, 3, 4.0), > (2, 4, 2.5), > (2, 3, 1.0) > v1(a, b, c); > select > a, > count(distinct b + 1), > avg(distinct 1 + b) filter (where c > 0), > sum(c) > from > v1 > group by a; > {noformat} > The Expand operator has three projections (each producing a row for each > incoming row): > {noformat} > [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for > regular aggregation) > [a#87, (b#88 + 1), null, 1, null, null], <== projection #2 (for > distinct aggregation of b + 1) > [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for > distinct aggregation of 1 + b) > {noformat} > In reality, the Expand only needs one projection for 1 + b and b + 1, because > they are semantically equivalent. > With the proposed change, the Expand operator's projections look like this: > {noformat} > [a#67, null, 0, null, UnscaledValue(c#69)], <== projection #1 (for regular > aggregations) > [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]], <== projection #2 (for distinct > aggregation on b + 1 and 1 + b) > {noformat} > With one less projection, Expand produces n*2 rows instead of n*3 rows, but > still produces the correct result. > In the case where all distinct aggregates have semantically equivalent > children, the Expand operator is not needed at all. > Assume this benchmark: > {noformat} > runBenchmark("distinct aggregates") { > val N = 20 << 22 > val benchmark = new Benchmark("distinct aggregates", N, output = output) > spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1") > .createOrReplaceTempView("test") > def f1(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 1 + id1), > avg(distinct 1 + ID1) > from > test > group by k""").noop() > benchmark.addCase("all semantically equivalent", numIters = 2) { _ => > f1() > } > def f2(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 1 + id1), > avg(distinct 2 + ID1) > from > test > group by k""").noop() > benchmark.addCase("some semantically equivalent", numIters = 2) { _ => > f2() > } > def f3(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 3 + id1), > avg(distinct 2 + ID1) > from > test > group by k""").noop() > benchmark.addCase("none semantically equivalent", numIters = 2) { _ => > f3() > } > benchmark.run() > } > {noformat} > Before the change: > {noformat} > [info] distinct aggregates: Best Time(ms) Avg Time(ms) > Stdev(ms)Rate(M/s) Per Row(ns) Relative > [info] > > [info] all semantically equivalent 14721 14859 > 195 5.7 175.5 1.0X > [info] some semantically equivalent 14569 14572 > 5 5.8 173.7 1.0X > [info] none semantically equivalent 14408 14488 > 113
[jira] [Commented] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children
[ https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601554#comment-17601554 ] Apache Spark commented on SPARK-40382: -- User 'bersprockets' has created a pull request for this issue: https://github.com/apache/spark/pull/37825 > Reduce projections in Expand when multiple distinct aggregations have > semantically equivalent children > -- > > Key: SPARK-40382 > URL: https://issues.apache.org/jira/browse/SPARK-40382 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: Bruce Robbins >Priority: Major > > In RewriteDistinctAggregates, when grouping aggregate expressions by function > children, we should treat children that are semantically equivalent as the > same. > This proposed change potentially reduces the number of projections in the > Expand operator added to a plan. In some cases, it may eliminate the need for > an Expand operator. > Example: In the following query, the Expand operator creates 3*n rows (where > n is the number of incoming rows) because it has a projection for function > children b + 1, 1 + b and c. > {noformat} > create or replace temp view v1 as > select * from values > (1, 2, 3.0), > (1, 3, 4.0), > (2, 4, 2.5), > (2, 3, 1.0) > v1(a, b, c); > select > a, > count(distinct b + 1), > avg(distinct 1 + b) filter (where c > 0), > sum(c) > from > v1 > group by a; > {noformat} > The Expand operator has three projections (each producing a row for each > incoming row): > {noformat} > [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for > regular aggregation) > [a#87, (b#88 + 1), null, 1, null, null], <== projection #2 (for > distinct aggregation of b + 1) > [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for > distinct aggregation of 1 + b) > {noformat} > In reality, the Expand only needs one projection for 1 + b and b + 1, because > they are semantically equivalent. > With the proposed change, the Expand operator's projections look like this: > {noformat} > [a#67, null, 0, null, UnscaledValue(c#69)], <== projection #1 (for regular > aggregations) > [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]], <== projection #2 (for distinct > aggregation on b + 1 and 1 + b) > {noformat} > With one less projection, Expand produces n*2 rows instead of n*3 rows, but > still produces the correct result. > In the case where all distinct aggregates have semantically equivalent > children, the Expand operator is not needed at all. > Assume this benchmark: > {noformat} > runBenchmark("distinct aggregates") { > val N = 20 << 22 > val benchmark = new Benchmark("distinct aggregates", N, output = output) > spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1") > .createOrReplaceTempView("test") > def f1(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 1 + id1), > avg(distinct 1 + ID1) > from > test > group by k""").noop() > benchmark.addCase("all semantically equivalent", numIters = 2) { _ => > f1() > } > def f2(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 1 + id1), > avg(distinct 2 + ID1) > from > test > group by k""").noop() > benchmark.addCase("some semantically equivalent", numIters = 2) { _ => > f2() > } > def f3(): Unit = spark.sql( > """ > select > k, > sum(distinct id1 + 1), > count(distinct 3 + id1), > avg(distinct 2 + ID1) > from > test > group by k""").noop() > benchmark.addCase("none semantically equivalent", numIters = 2) { _ => > f3() > } > benchmark.run() > } > {noformat} > Before the change: > {noformat} > [info] distinct aggregates: Best Time(ms) Avg Time(ms) > Stdev(ms)Rate(M/s) Per Row(ns) Relative > [info] > > [info] all semantically equivalent 14721 14859 > 195 5.7 175.5 1.0X > [info] some semantically equivalent 14569 14572 > 5 5.8 173.7 1.0X > [info] none semantically equivalent 14408 14488 > 113 5.8 171.8 1.0X > {noformat} > After the propose