Bruce Robbins created SPARK-40382:
-------------------------------------

             Summary: Reduce projections in Expand when multiple distinct 
aggregations have semantically equivalent children
                 Key: SPARK-40382
                 URL: https://issues.apache.org/jira/browse/SPARK-40382
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.4.0
            Reporter: Bruce Robbins


In RewriteDistinctAggregates, when grouping aggregate expressions by function 
children, we should treat children that are semantically equivalent as the same.

This proposed change potentially reduces the number of projections in the 
Expand operator added to a plan. In some cases, it may eliminate the need for 
an Expand operator.

Example: In the following query, the Expand operator creates 3*n rows (where n 
is the number of incoming rows) because it has a projection for function 
children b + 1, 1 + b and c.

{noformat}
create or replace temp view v1 as
select * from values
(1, 2, 3.0),
(1, 3, 4.0),
(2, 4, 2.5),
(2, 3, 1.0)
v1(a, b, c);

select
  a,
  count(distinct b + 1),
  avg(distinct 1 + b) filter (where c > 0),
  sum(c)
from
  v1
group by a;
{noformat}
The Expand operator has three projections (each producing a row for each 
incoming row):
{noformat}
[a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for 
regular aggregation)
[a#87, (b#88 + 1), null, 1, null, null],          <== projection #2 (for 
distinct aggregation of b + 1)
[a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for 
distinct aggregation of 1 + b)
{noformat}
In reality, the Expand only needs one projection for 1 + b and b + 1, because 
they are semantically equivalent.

With the proposed change, the Expand operator's projections look like this:
{noformat}
[a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular 
aggregations)
[a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct 
aggregation on b + 1 and 1 + b)
{noformat}
With one less projection, Expand produces n*2 rows instead of n*3 rows, but 
still produces the correct result.

In the case where all distinct aggregates have semantically equivalent 
children, the Expand operator is not needed at all.

Assume this benchmark:
{noformat}
    runBenchmark("distinct aggregates") {
      val N = 20 << 22

      val benchmark = new Benchmark("distinct aggregates", N, output = output)
      spark.range(N).selectExpr("id % 100 as k", "id % 10 as id1")
        .createOrReplaceTempView("test")

      def f1(): Unit = spark.sql(
        """
          select
            k,
            sum(distinct id1 + 1),
            count(distinct 1 + id1),
            avg(distinct 1 + ID1)
          from
            test
          group by k""").noop()

      benchmark.addCase("all semantically equivalent", numIters = 2) { _ =>
        f1()
      }

      def f2(): Unit = spark.sql(
        """
          select
            k,
            sum(distinct id1 + 1),
            count(distinct 1 + id1),
            avg(distinct 2 + ID1)
          from
            test
          group by k""").noop()

      benchmark.addCase("some semantically equivalent", numIters = 2) { _ =>
        f2()
      }

      def f3(): Unit = spark.sql(
        """
          select
            k,
            sum(distinct id1 + 1),
            count(distinct 3 + id1),
            avg(distinct 2 + ID1)
          from
            test
          group by k""").noop()

      benchmark.addCase("none semantically equivalent", numIters = 2) { _ =>
        f3()
      }
      benchmark.run()
    }
 {noformat}
Before the change:
{noformat}
[info] distinct aggregates:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] 
------------------------------------------------------------------------------------------------------------------------
[info] all semantically equivalent                       14721          14859   
      195          5.7         175.5       1.0X
[info] some semantically equivalent                      14569          14572   
        5          5.8         173.7       1.0X
[info] none semantically equivalent                      14408          14488   
      113          5.8         171.8       1.0X
{noformat}
After the proposed change:
{noformat}
[info] distinct aggregates:                      Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] 
------------------------------------------------------------------------------------------------------------------------
[info] all semantically equivalent                        3658           3692   
       49         22.9          43.6       1.0X
[info] some semantically equivalent                       9124           9214   
      127          9.2         108.8       0.4X
[info] none semantically equivalent                      14601          14777   
      250          5.7         174.1       0.3X
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to