[ 
https://issues.apache.org/jira/browse/SPARK-40764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Apache Spark reassigned SPARK-40764:
------------------------------------

    Assignee: Apache Spark

> Extract partitioning through all children output expressions
> ------------------------------------------------------------
>
>                 Key: SPARK-40764
>                 URL: https://issues.apache.org/jira/browse/SPARK-40764
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.4.0
>            Reporter: Yuming Wang
>            Assignee: Apache Spark
>            Priority: Major
>
> {code:sql}
> WITH web_tv as (
>     select
>       ws_item_sk item_sk,
>       d_date,
>       sum(ws_sales_price) sumws,
>       row_number() over (partition by ws_item_sk order by d_date) rk
>     from
>       web_sales, date_dim
>     where
>       ws_sold_date_sk=d_date_sk
>         and d_month_seq between 1212 and 1212 + 11
>         and ws_item_sk is not NULL
>     group by
>       ws_item_sk, d_date),
> web_v1 as (
>     select
>       v1.item_sk,
>       v1.d_date,
>       v1.sumws,
>       sum(v2.sumws) cume_sales
>     from
>       web_tv v1, web_tv v2
>     where
>       v1.item_sk = v2.item_sk
>         and v1.rk >= v2.rk
>     group by
>       v1.item_sk,
>       v1.d_date,
>       v1.sumws)
> select *
> from web_v1
> {code}
> Before:
> {noformat}
> == Physical Plan ==
> *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], 
> functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5])
> +- *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], 
> functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, 
> sum#132, isEmpty#133])
>    +- *(13) Project [item_sk#1, d_date#2, sumws#3, sumws#4]
>       +- *(13) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8)
>          :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, 
> [plan_id=1]
>          :     +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7]
>          :        +- Window [row_number() windowspecdefinition(ws_item_sk#9, 
> d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
> unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 
> ASC NULLS FIRST]
>          :           +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC 
> NULLS FIRST], false, 0
>          :              +- Exchange hashpartitioning(ws_item_sk#9, 5), 
> ENSURE_REQUIREMENTS, [plan_id=2]
>          :                 +- *(3) HashAggregate(keys=[ws_item_sk#9, 
> d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], 
> output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9])
>          :                    +- Exchange hashpartitioning(ws_item_sk#9, 
> d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3]
>          :                       +- *(2) HashAggregate(keys=[ws_item_sk#9, 
> d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], 
> output=[ws_item_sk#9, d_date#2, sum#134])
>          :                          +- *(2) Project [ws_item_sk#9, 
> ws_sales_price#10, d_date#2]
>          :                             +- *(2) BroadcastHashJoin 
> [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false
>          :                                :- *(2) Filter 
> isnotnull(ws_item_sk#9)
>          :                                :  +- *(2) ColumnarToRow
>          :                                :     +- FileScan parquet 
> spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11]
>  Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, 
> Location: InMemoryFileIndex(0 paths)[], PartitionFilters: 
> [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], 
> ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)>
>          :                                +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
> [plan_id=4]
>          :                                   +- *(1) Project [d_date_sk#12, 
> d_date#2]
>          :                                      +- *(1) Filter 
> (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND 
> (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12))
>          :                                         +- *(1) ColumnarToRow
>          :                                            +- FileScan parquet 
> spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: 
> true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), 
> (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: 
> InMemoryFileIndex(1 
> paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), 
> GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., 
> ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
>          +- *(12) Sort [item_sk#6 ASC NULLS FIRST], false, 0
>             +- Exchange hashpartitioning(item_sk#6, 5), ENSURE_REQUIREMENTS, 
> [plan_id=5]
>                +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, 
> rk#8]
>                   +- Window [row_number() windowspecdefinition(ws_item_sk#70, 
> d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
> unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 
> ASC NULLS FIRST]
>                      +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 
> ASC NULLS FIRST], false, 0
>                         +- ReusedExchange [item_sk#1, d_date#71, sumws#3, 
> ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), 
> ENSURE_REQUIREMENTS, [plan_id=2]
> {noformat}
> After:
> {noformat}
> == Physical Plan ==
> *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], 
> functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5])
> +- *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], 
> functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, 
> sum#132, isEmpty#133])
>    +- *(12) Project [item_sk#1, d_date#2, sumws#3, sumws#4]
>       +- *(12) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8)
>          :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0
>          :  +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, 
> [plan_id=1]
>          :     +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7]
>          :        +- Window [row_number() windowspecdefinition(ws_item_sk#9, 
> d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
> unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 
> ASC NULLS FIRST]
>          :           +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC 
> NULLS FIRST], false, 0
>          :              +- Exchange hashpartitioning(ws_item_sk#9, 5), 
> ENSURE_REQUIREMENTS, [plan_id=2]
>          :                 +- *(3) HashAggregate(keys=[ws_item_sk#9, 
> d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], 
> output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9])
>          :                    +- Exchange hashpartitioning(ws_item_sk#9, 
> d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3]
>          :                       +- *(2) HashAggregate(keys=[ws_item_sk#9, 
> d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], 
> output=[ws_item_sk#9, d_date#2, sum#134])
>          :                          +- *(2) Project [ws_item_sk#9, 
> ws_sales_price#10, d_date#2]
>          :                             +- *(2) BroadcastHashJoin 
> [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false
>          :                                :- *(2) Filter 
> isnotnull(ws_item_sk#9)
>          :                                :  +- *(2) ColumnarToRow
>          :                                :     +- FileScan parquet 
> spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11]
>  Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, 
> Location: InMemoryFileIndex(0 paths)[], PartitionFilters: 
> [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], 
> ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)>
>          :                                +- BroadcastExchange 
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), 
> [plan_id=4]
>          :                                   +- *(1) Project [d_date_sk#12, 
> d_date#2]
>          :                                      +- *(1) Filter 
> (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND 
> (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12))
>          :                                         +- *(1) ColumnarToRow
>          :                                            +- FileScan parquet 
> spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: 
> true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), 
> (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: 
> InMemoryFileIndex(1 
> paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap...,
>  PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), 
> GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., 
> ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int>
>          +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, rk#8]
>             +- Window [row_number() windowspecdefinition(ws_item_sk#70, 
> d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
> unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 
> ASC NULLS FIRST]
>                +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 ASC 
> NULLS FIRST], false, 0
>                   +- ReusedExchange [item_sk#1, d_date#71, sumws#3, 
> ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), 
> ENSURE_REQUIREMENTS, [plan_id=2]
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to