[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Description: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; set spark.sql.unionRequiredDistributionPushdown.enabled=true; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan does not contain exchange shuffle plan {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1id#7,name#8 +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} was: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan does not contain exchange shuffle plan {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number()
[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Labels: pull-request-available (was: ) > Optimze shuffle for union spark plan > > > Key: SPARK-42935 > URL: https://issues.apache.org/jira/browse/SPARK-42935 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Jeff Min >Priority: Major > Labels: pull-request-available > Fix For: 3.5.0 > > > Union plan does not take full advantage of children plan output partitionings > when output partitoning can't match parent plan's required distribution. For > example, Table1 and table2 are all bucketed table with bucket column id and > bucket number 100. We will do row_number window function after union the two > tables. > {code:sql} > create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO > 100 BUCKETS; > insert into table1 values(1, "s1"); > insert into table1 values(2, "s2"); > > create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO > 100 BUCKETS; > insert into table2 values(1, "s3"); > > set spark.sql.shuffle.partitions=100; > explain select *, row_number() over(partition by id order by name desc) > id_row_number from (select * from table1 union all select * from > table2);{code} > The physical plan is > {code:bash} > AdaptiveSparkPlan isFinalPlan=false > +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, > specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS > id_row_number#28, id#35, name#36 DESC NULLS LAST > +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 > +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, > [plan_id=88] > +- Union > :- FileScan csv spark_catalog.default.table1id#35,name#36 > +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} > > Although the two tables are bucketed by id column, there's still a exchange > plan after union.The reason is that union plan's output partitioning is null. > We can indroduce a new idea to optimize exchange plan: > # First introduce a new RDD, it consists of parent rdds that has the same > partition size. The ith parttition corresponds to ith partition of each > parent rdd. > # Then push the required distribution to union plan's children. If any child > output partitioning matches the required distribution , we can reduce this > child shuffle operation. > After doing these, the physical plan does not contain exchange shuffle plan > {code:bash} > AdaptiveSparkPlan isFinalPlan=false > +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, > specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS > id_row_number#0, id#7, name#8 DESC NULLS LAST > +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 > +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), > ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, > 200) > :- FileScan csv spark_catalog.default.table1id#7,name#8 > +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} > > -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Description: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan does not contain exchange shuffle plan {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1id#7,name#8 +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} was: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedprece
[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Description: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1id#7,name#8 +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} was: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is {code:bash} daptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_
[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Description: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is {code:bash} daptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1id#7,name#8 +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} was: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can ind
[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Description: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} AdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is {code:bash} daptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1id#7,name#8 +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} was: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches
[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Description: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:bash} Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is {code:bash} daptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1id#7,name#8 +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} was: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:shell} Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can ind
[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan
[ https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jeff Min updated SPARK-42935: - Description: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. {code:sql} create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2);{code} The physical plan is {code:shell} Unable to find source-code formatter for language: shell. Available languages are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, yamlAdaptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28, id#35, name#36 DESC NULLS LAST +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1id#35,name#36 +- FileScan csv spark_catalog.default.table2id#37,name#38 {code} Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is {code:shell} daptiveSparkPlan isFinalPlan=false +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0, id#7, name#8 DESC NULLS LAST +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0 +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1id#7,name#8 +- FileScan csv spark_catalog.default.table2id#9,name#10 {code} was: Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2); The physical plan is AdaptiveSparkPlan isFinalPlan=false +- Window [row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28], [id#35], [name#36 DESC NULLS LAST] +- Sort [id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1[id#35,name#36] +- FileScan csv spark_catalog.default.table2[id#37,name#38] Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribut
[jira] [Created] (SPARK-42935) Optimze shuffle for union spark plan
Jeff Min created SPARK-42935: Summary: Optimze shuffle for union spark plan Key: SPARK-42935 URL: https://issues.apache.org/jira/browse/SPARK-42935 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.0 Reporter: Jeff Min Fix For: 3.5.0 Union plan does not take full advantage of children plan output partitionings when output partitoning can't match parent plan's required distribution. For example, Table1 and table2 are all bucketed table with bucket column id and bucket number 100. We will do row_number window function after union the two tables. create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table1 values(1, "s1"); insert into table1 values(2, "s2"); create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 BUCKETS; insert into table2 values(1, "s3"); set spark.sql.shuffle.partitions=100; explain select *, row_number() over(partition by id order by name desc) id_row_number from (select * from table1 union all select * from table2); The physical plan is AdaptiveSparkPlan isFinalPlan=false +- Window [row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#28], [id#35], [name#36 DESC NULLS LAST] +- Sort [id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST], false, 0 +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88] +- Union :- FileScan csv spark_catalog.default.table1[id#35,name#36] +- FileScan csv spark_catalog.default.table2[id#37,name#38] Although the two tables are bucketed by id column, there's still a exchange plan after union.The reason is that union plan's output partitioning is null. We can indroduce a new idea to optimize exchange plan: # First introduce a new RDD, it consists of parent rdds that has the same partition size. The ith parttition corresponds to ith partition of each parent rdd. # Then push the required distribution to union plan's children. If any child output partitioning matches the required distribution , we can reduce this child shuffle operation. After doing these, the physical plan is daptiveSparkPlan isFinalPlan=false +- Window [row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS id_row_number#0], [id#7], [name#8 DESC NULLS LAST] +- Sort [id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST], false, 0 +- UnionZip [ClusteredDistribution(ArrayBuffer(id#7),false,None), ClusteredDistribution(ArrayBuffer(id#9),false,None)], hashpartitioning(id#7, 200) :- FileScan csv spark_catalog.default.table1[id#7,name#8] +- FileScan csv spark_catalog.default.table2[id#9,name#10] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org