[ 
https://issues.apache.org/jira/browse/SPARK-23753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-23753:
----------------------------------
    Affects Version/s:     (was: 3.0.0)
                       3.1.0

> [Performance] Group By Push Down through Join
> ---------------------------------------------
>
>                 Key: SPARK-23753
>                 URL: https://issues.apache.org/jira/browse/SPARK-23753
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.1.0
>            Reporter: Ioana Delaney
>            Priority: Major
>
> *Group By push down through Join*
> Another transformation that benefits from RI constraints is Group By push 
> down through joins. The transformation interchanges the order of the group-by 
> and join operations. The benefit of pushing down a group-by is that it may 
> reduce the number of input rows to the join. On the other hand, if the join 
> is very selective, it might make sense to execute the group by after the 
> join. That is why this transformation is in general applied based on cost or 
> selectivity estimates. 
> However, if the join is an RI join, under certain conditions, it is safe to 
> push down group by operation below the join. An example is shown below.
> {code}
> select c_customer_sk, c_first_name, c_last_name, s_store_sk, s_store_name, 
>        sum(ss.ss_quantity) as store_sales_quantity
> from store_sales ss, date_dim, customer, store
> where d_date_sk  = ss_sold_date_sk and
>       c_customer_sk  = ss_customer_sk and
>       s_store_sk  = ss_store_sk and
>       d_year between 2000 and 2002
> group by c_customer_sk, c_first_name, c_last_name, s_store_sk, s_store_name
> {code}
> The query computes the quantities sold grouped by _customer_ and _store_ 
> tables. The tables are in a _star schema_ join. The grouping columns are a 
> super set of the join keys. The aggregate columns come from the fact table 
> _store_sales_. The group by operation can be pushed down to the fact table 
> _store_sales_ through the join with the _customer_ and _store_ tables. The 
> join will not affect the partitions nor the aggregates computed by the pushed 
> down group-by since every tuple in _store_sales_ will join with a tuple in 
> _customer_ and _store_ tables.
> {code}
> select c_customer_sk, c_first_name, c_last_name, s_store_sk, s_store_name,
>        v1.store_sales_quantity
> from customer, store, (select ss_customer_sk, ss_store_sk, sum(ss_quantity) 
> as store_sales_quantity
>                        from store_sales, date_dim
>                        where d_date_sk = ss_sold_date_sk and
>                              d_year between 2000 and 2002
>                        group by ss_customer_sk, ss_store_sk ) v1
> where c_customer_sk = v1.ss_customer_sk and
>       s_store_sk = v1.ss_store_sk
> {code}
> \\
> When the query is run using a 1TB TPC-DS setup, the group by reduces the 
> number of rows from 1.5 billion to 100 million rows and the query execution 
> drops from about 70 secs to 30 secs, a 2x improvement.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to