[ https://issues.apache.org/jira/browse/SPARK-40764?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-40764: ------------------------------------ Assignee: Apache Spark > Extract partitioning through all children output expressions > ------------------------------------------------------------ > > Key: SPARK-40764 > URL: https://issues.apache.org/jira/browse/SPARK-40764 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.4.0 > Reporter: Yuming Wang > Assignee: Apache Spark > Priority: Major > > {code:sql} > WITH web_tv as ( > select > ws_item_sk item_sk, > d_date, > sum(ws_sales_price) sumws, > row_number() over (partition by ws_item_sk order by d_date) rk > from > web_sales, date_dim > where > ws_sold_date_sk=d_date_sk > and d_month_seq between 1212 and 1212 + 11 > and ws_item_sk is not NULL > group by > ws_item_sk, d_date), > web_v1 as ( > select > v1.item_sk, > v1.d_date, > v1.sumws, > sum(v2.sumws) cume_sales > from > web_tv v1, web_tv v2 > where > v1.item_sk = v2.item_sk > and v1.rk >= v2.rk > group by > v1.item_sk, > v1.d_date, > v1.sumws) > select * > from web_v1 > {code} > Before: > {noformat} > == Physical Plan == > *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], > functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5]) > +- *(13) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], > functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, > sum#132, isEmpty#133]) > +- *(13) Project [item_sk#1, d_date#2, sumws#3, sumws#4] > +- *(13) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8) > :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, > [plan_id=1] > : +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7] > : +- Window [row_number() windowspecdefinition(ws_item_sk#9, > d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, > unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 > ASC NULLS FIRST] > : +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC > NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(ws_item_sk#9, 5), > ENSURE_REQUIREMENTS, [plan_id=2] > : +- *(3) HashAggregate(keys=[ws_item_sk#9, > d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], > output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9]) > : +- Exchange hashpartitioning(ws_item_sk#9, > d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3] > : +- *(2) HashAggregate(keys=[ws_item_sk#9, > d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], > output=[ws_item_sk#9, d_date#2, sum#134]) > : +- *(2) Project [ws_item_sk#9, > ws_sales_price#10, d_date#2] > : +- *(2) BroadcastHashJoin > [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false > : :- *(2) Filter > isnotnull(ws_item_sk#9) > : : +- *(2) ColumnarToRow > : : +- FileScan parquet > spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11] > Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, > Location: InMemoryFileIndex(0 paths)[], PartitionFilters: > [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], > ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)> > : +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), > [plan_id=4] > : +- *(1) Project [d_date_sk#12, > d_date#2] > : +- *(1) Filter > (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND > (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12)) > : +- *(1) ColumnarToRow > : +- FileScan parquet > spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: > true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), > (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: > InMemoryFileIndex(1 > paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap..., > PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), > GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., > ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int> > +- *(12) Sort [item_sk#6 ASC NULLS FIRST], false, 0 > +- Exchange hashpartitioning(item_sk#6, 5), ENSURE_REQUIREMENTS, > [plan_id=5] > +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, > rk#8] > +- Window [row_number() windowspecdefinition(ws_item_sk#70, > d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, > unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 > ASC NULLS FIRST] > +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 > ASC NULLS FIRST], false, 0 > +- ReusedExchange [item_sk#1, d_date#71, sumws#3, > ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), > ENSURE_REQUIREMENTS, [plan_id=2] > {noformat} > After: > {noformat} > == Physical Plan == > *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], > functions=[sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, cume_sales#5]) > +- *(12) HashAggregate(keys=[item_sk#1, d_date#2, sumws#3], > functions=[partial_sum(sumws#4)], output=[item_sk#1, d_date#2, sumws#3, > sum#132, isEmpty#133]) > +- *(12) Project [item_sk#1, d_date#2, sumws#3, sumws#4] > +- *(12) SortMergeJoin [item_sk#1], [item_sk#6], Inner, (rk#7 >= rk#8) > :- *(6) Sort [item_sk#1 ASC NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(item_sk#1, 5), ENSURE_REQUIREMENTS, > [plan_id=1] > : +- *(5) Project [item_sk#1, d_date#2, sumws#3, rk#7] > : +- Window [row_number() windowspecdefinition(ws_item_sk#9, > d_date#2 ASC NULLS FIRST, specifiedwindowframe(RowFrame, > unboundedpreceding$(), currentrow$())) AS rk#7], [ws_item_sk#9], [d_date#2 > ASC NULLS FIRST] > : +- *(4) Sort [ws_item_sk#9 ASC NULLS FIRST, d_date#2 ASC > NULLS FIRST], false, 0 > : +- Exchange hashpartitioning(ws_item_sk#9, 5), > ENSURE_REQUIREMENTS, [plan_id=2] > : +- *(3) HashAggregate(keys=[ws_item_sk#9, > d_date#2], functions=[sum(UnscaledValue(ws_sales_price#10))], > output=[item_sk#1, d_date#2, sumws#3, ws_item_sk#9]) > : +- Exchange hashpartitioning(ws_item_sk#9, > d_date#2, 5), ENSURE_REQUIREMENTS, [plan_id=3] > : +- *(2) HashAggregate(keys=[ws_item_sk#9, > d_date#2], functions=[partial_sum(UnscaledValue(ws_sales_price#10))], > output=[ws_item_sk#9, d_date#2, sum#134]) > : +- *(2) Project [ws_item_sk#9, > ws_sales_price#10, d_date#2] > : +- *(2) BroadcastHashJoin > [ws_sold_date_sk#11], [d_date_sk#12], Inner, BuildRight, false > : :- *(2) Filter > isnotnull(ws_item_sk#9) > : : +- *(2) ColumnarToRow > : : +- FileScan parquet > spark_catalog.default.web_sales[ws_item_sk#9,ws_sales_price#10,ws_sold_date_sk#11] > Batched: true, DataFilters: [isnotnull(ws_item_sk#9)], Format: Parquet, > Location: InMemoryFileIndex(0 paths)[], PartitionFilters: > [isnotnull(ws_sold_date_sk#11)], PushedFilters: [IsNotNull(ws_item_sk)], > ReadSchema: struct<ws_item_sk:int,ws_sales_price:decimal(7,2)> > : +- BroadcastExchange > HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), > [plan_id=4] > : +- *(1) Project [d_date_sk#12, > d_date#2] > : +- *(1) Filter > (((isnotnull(d_month_seq#44) AND (d_month_seq#44 >= 1212)) AND > (d_month_seq#44 <= 1223)) AND isnotnull(d_date_sk#12)) > : +- *(1) ColumnarToRow > : +- FileScan parquet > spark_catalog.default.date_dim[d_date_sk#12,d_date#2,d_month_seq#44] Batched: > true, DataFilters: [isnotnull(d_month_seq#44), (d_month_seq#44 >= 1212), > (d_month_seq#44 <= 1223), isnotnull(d_date_..., Format: Parquet, Location: > InMemoryFileIndex(1 > paths)[file:/Users/yumwang/spark/parquet-1.13/launcher/spark-warehouse/org.ap..., > PartitionFilters: [], PushedFilters: [IsNotNull(d_month_seq), > GreaterThanOrEqual(d_month_seq,1212), LessThanOrEqual(d_month_seq,1223),..., > ReadSchema: struct<d_date_sk:int,d_date:date,d_month_seq:int> > +- *(11) Project [item_sk#1 AS item_sk#6, sumws#3 AS sumws#4, rk#8] > +- Window [row_number() windowspecdefinition(ws_item_sk#70, > d_date#71 ASC NULLS FIRST, specifiedwindowframe(RowFrame, > unboundedpreceding$(), currentrow$())) AS rk#8], [ws_item_sk#70], [d_date#71 > ASC NULLS FIRST] > +- *(10) Sort [ws_item_sk#70 ASC NULLS FIRST, d_date#71 ASC > NULLS FIRST], false, 0 > +- ReusedExchange [item_sk#1, d_date#71, sumws#3, > ws_item_sk#70], Exchange hashpartitioning(ws_item_sk#9, 5), > ENSURE_REQUIREMENTS, [plan_id=2] > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org