[jira] [Assigned] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children

2022-10-13 Thread Wenchen Fan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-40382:
---

Assignee: Bruce Robbins

> Reduce projections in Expand when multiple distinct aggregations have 
> semantically equivalent children
> --
>
> Key: SPARK-40382
> URL: https://issues.apache.org/jira/browse/SPARK-40382
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Bruce Robbins
>Assignee: Bruce Robbins
>Priority: Major
>
> In RewriteDistinctAggregates, when grouping aggregate expressions by function 
> children, we should treat children that are semantically equivalent as the 
> same.
> This proposed change potentially reduces the number of projections in the 
> Expand operator added to a plan. In some cases, it may eliminate the need for 
> an Expand operator.
> Example: In the following query, the Expand operator creates 3*n rows (where 
> n is the number of incoming rows) because it has a projection for function 
> children b + 1, 1 + b and c.
> {noformat}
> create or replace temp view v1 as
> select * from values
> (1, 2, 3.0),
> (1, 3, 4.0),
> (2, 4, 2.5),
> (2, 3, 1.0)
> v1(a, b, c);
> select
>   a,
>   count(distinct b + 1),
>   avg(distinct 1 + b) filter (where c > 0),
>   sum(c)
> from
>   v1
> group by a;
> {noformat}
> The Expand operator has three projections (each producing a row for each 
> incoming row):
> {noformat}
> [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for 
> regular aggregation)
> [a#87, (b#88 + 1), null, 1, null, null],  <== projection #2 (for 
> distinct aggregation of b + 1)
> [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for 
> distinct aggregation of 1 + b)
> {noformat}
> In reality, the Expand only needs one projection for 1 + b and b + 1, because 
> they are semantically equivalent.
> With the proposed change, the Expand operator's projections look like this:
> {noformat}
> [a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular 
> aggregations)
> [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct 
> aggregation on b + 1 and 1 + b)
> {noformat}
> With one less projection, Expand produces n*2 rows instead of n*3 rows, but 
> still produces the correct result.
> In the case where all distinct aggregates have semantically equivalent 
> children, the Expand operator is not needed at all.
> Assume this benchmark:
> {noformat}
> runBenchmark("distinct aggregates") {
>   val N = 20 << 22
>   val benchmark = new Benchmark("distinct aggregates", N, output = output)
>   spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1")
> .createOrReplaceTempView("test")
>   def f1(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 1 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("all semantically equivalent", numIters = 2) { _ =>
> f1()
>   }
>   def f2(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 2 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("some semantically equivalent", numIters = 2) { _ =>
> f2()
>   }
>   def f3(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 3 + id1),
> avg(distinct 2 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("none semantically equivalent", numIters = 2) { _ =>
> f3()
>   }
>   benchmark.run()
> }
>  {noformat}
> Before the change:
> {noformat}
> [info] distinct aggregates:  Best Time(ms)   Avg Time(ms) 
>   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
> [info] 
> 
> [info] all semantically equivalent   14721  14859 
> 195  5.7 175.5   1.0X
> [info] some semantically equivalent  14569  14572 
>   5  5.8 173.7   1.0X
> [info] none semantically equivalent  14408  14488 
> 113  5.8 171.8   1.0X
> {noformat}
> After the proposed change:
> {noformat}
> [info] distinct aggregates:  Best Time(ms)   Avg 

[jira] [Assigned] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children

2022-09-07 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-40382:


Assignee: (was: Apache Spark)

> Reduce projections in Expand when multiple distinct aggregations have 
> semantically equivalent children
> --
>
> Key: SPARK-40382
> URL: https://issues.apache.org/jira/browse/SPARK-40382
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Bruce Robbins
>Priority: Major
>
> In RewriteDistinctAggregates, when grouping aggregate expressions by function 
> children, we should treat children that are semantically equivalent as the 
> same.
> This proposed change potentially reduces the number of projections in the 
> Expand operator added to a plan. In some cases, it may eliminate the need for 
> an Expand operator.
> Example: In the following query, the Expand operator creates 3*n rows (where 
> n is the number of incoming rows) because it has a projection for function 
> children b + 1, 1 + b and c.
> {noformat}
> create or replace temp view v1 as
> select * from values
> (1, 2, 3.0),
> (1, 3, 4.0),
> (2, 4, 2.5),
> (2, 3, 1.0)
> v1(a, b, c);
> select
>   a,
>   count(distinct b + 1),
>   avg(distinct 1 + b) filter (where c > 0),
>   sum(c)
> from
>   v1
> group by a;
> {noformat}
> The Expand operator has three projections (each producing a row for each 
> incoming row):
> {noformat}
> [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for 
> regular aggregation)
> [a#87, (b#88 + 1), null, 1, null, null],  <== projection #2 (for 
> distinct aggregation of b + 1)
> [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for 
> distinct aggregation of 1 + b)
> {noformat}
> In reality, the Expand only needs one projection for 1 + b and b + 1, because 
> they are semantically equivalent.
> With the proposed change, the Expand operator's projections look like this:
> {noformat}
> [a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular 
> aggregations)
> [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct 
> aggregation on b + 1 and 1 + b)
> {noformat}
> With one less projection, Expand produces n*2 rows instead of n*3 rows, but 
> still produces the correct result.
> In the case where all distinct aggregates have semantically equivalent 
> children, the Expand operator is not needed at all.
> Assume this benchmark:
> {noformat}
> runBenchmark("distinct aggregates") {
>   val N = 20 << 22
>   val benchmark = new Benchmark("distinct aggregates", N, output = output)
>   spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1")
> .createOrReplaceTempView("test")
>   def f1(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 1 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("all semantically equivalent", numIters = 2) { _ =>
> f1()
>   }
>   def f2(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 2 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("some semantically equivalent", numIters = 2) { _ =>
> f2()
>   }
>   def f3(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 3 + id1),
> avg(distinct 2 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("none semantically equivalent", numIters = 2) { _ =>
> f3()
>   }
>   benchmark.run()
> }
>  {noformat}
> Before the change:
> {noformat}
> [info] distinct aggregates:  Best Time(ms)   Avg Time(ms) 
>   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
> [info] 
> 
> [info] all semantically equivalent   14721  14859 
> 195  5.7 175.5   1.0X
> [info] some semantically equivalent  14569  14572 
>   5  5.8 173.7   1.0X
> [info] none semantically equivalent  14408  14488 
> 113  5.8 171.8   1.0X
> {noformat}
> After the proposed change:
> {noformat}
> [info] distinct aggregates:  Best Time(ms)   Avg Time(ms) 
>   Stdev(ms)  

[jira] [Assigned] (SPARK-40382) Reduce projections in Expand when multiple distinct aggregations have semantically equivalent children

2022-09-07 Thread Apache Spark (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40382?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-40382:


Assignee: Apache Spark

> Reduce projections in Expand when multiple distinct aggregations have 
> semantically equivalent children
> --
>
> Key: SPARK-40382
> URL: https://issues.apache.org/jira/browse/SPARK-40382
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: Bruce Robbins
>Assignee: Apache Spark
>Priority: Major
>
> In RewriteDistinctAggregates, when grouping aggregate expressions by function 
> children, we should treat children that are semantically equivalent as the 
> same.
> This proposed change potentially reduces the number of projections in the 
> Expand operator added to a plan. In some cases, it may eliminate the need for 
> an Expand operator.
> Example: In the following query, the Expand operator creates 3*n rows (where 
> n is the number of incoming rows) because it has a projection for function 
> children b + 1, 1 + b and c.
> {noformat}
> create or replace temp view v1 as
> select * from values
> (1, 2, 3.0),
> (1, 3, 4.0),
> (2, 4, 2.5),
> (2, 3, 1.0)
> v1(a, b, c);
> select
>   a,
>   count(distinct b + 1),
>   avg(distinct 1 + b) filter (where c > 0),
>   sum(c)
> from
>   v1
> group by a;
> {noformat}
> The Expand operator has three projections (each producing a row for each 
> incoming row):
> {noformat}
> [a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for 
> regular aggregation)
> [a#87, (b#88 + 1), null, 1, null, null],  <== projection #2 (for 
> distinct aggregation of b + 1)
> [a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for 
> distinct aggregation of 1 + b)
> {noformat}
> In reality, the Expand only needs one projection for 1 + b and b + 1, because 
> they are semantically equivalent.
> With the proposed change, the Expand operator's projections look like this:
> {noformat}
> [a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular 
> aggregations)
> [a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct 
> aggregation on b + 1 and 1 + b)
> {noformat}
> With one less projection, Expand produces n*2 rows instead of n*3 rows, but 
> still produces the correct result.
> In the case where all distinct aggregates have semantically equivalent 
> children, the Expand operator is not needed at all.
> Assume this benchmark:
> {noformat}
> runBenchmark("distinct aggregates") {
>   val N = 20 << 22
>   val benchmark = new Benchmark("distinct aggregates", N, output = output)
>   spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1")
> .createOrReplaceTempView("test")
>   def f1(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 1 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("all semantically equivalent", numIters = 2) { _ =>
> f1()
>   }
>   def f2(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 1 + id1),
> avg(distinct 2 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("some semantically equivalent", numIters = 2) { _ =>
> f2()
>   }
>   def f3(): Unit = spark.sql(
> """
>   select
> k,
> sum(distinct id1 + 1),
> count(distinct 3 + id1),
> avg(distinct 2 + ID1)
>   from
> test
>   group by k""").noop()
>   benchmark.addCase("none semantically equivalent", numIters = 2) { _ =>
> f3()
>   }
>   benchmark.run()
> }
>  {noformat}
> Before the change:
> {noformat}
> [info] distinct aggregates:  Best Time(ms)   Avg Time(ms) 
>   Stdev(ms)Rate(M/s)   Per Row(ns)   Relative
> [info] 
> 
> [info] all semantically equivalent   14721  14859 
> 195  5.7 175.5   1.0X
> [info] some semantically equivalent  14569  14572 
>   5  5.8 173.7   1.0X
> [info] none semantically equivalent  14408  14488 
> 113  5.8 171.8   1.0X
> {noformat}
> After the proposed change:
> {noformat}
> [info] distinct aggregates:  Best Time(ms)   Avg