[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-31 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Description: 
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
set spark.sql.unionRequiredDistributionPushdown.enabled=true;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan does not contain exchange shuffle plan
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}
 

 

  was:
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan does not contain exchange shuffle plan
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number()

[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Labels: pull-request-available  (was: )

> Optimze shuffle for union spark plan
> 
>
> Key: SPARK-42935
> URL: https://issues.apache.org/jira/browse/SPARK-42935
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Jeff Min
>Priority: Major
>  Labels: pull-request-available
> Fix For: 3.5.0
>
>
> Union plan does not take full advantage of children plan output partitionings 
> when output partitoning can't match parent plan's required distribution. For 
> example, Table1 and table2 are all bucketed table with bucket column id and 
> bucket number 100. We will do row_number window function after union the two 
> tables.
> {code:sql}
> create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 
> 100 BUCKETS;
> insert into table1 values(1, "s1");
> insert into table1 values(2, "s2");
> ​
> create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 
> 100 BUCKETS;
> insert into table2 values(1, "s3");
> ​
> set spark.sql.shuffle.partitions=100;
> explain select *, row_number() over(partition by id order by name desc) 
> id_row_number from (select * from table1 union all select * from 
> table2);{code}
> The physical plan is 
> {code:bash}
> AdaptiveSparkPlan isFinalPlan=false
> +- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
> id_row_number#28, id#35, name#36 DESC NULLS LAST
>   +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
>      +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, 
> [plan_id=88]
>         +- Union
>            :- FileScan csv spark_catalog.default.table1id#35,name#36
>            +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
>  
> Although the two tables are bucketed by id column, there's still a exchange 
> plan after union.The reason is that union plan's output partitioning is null.
> We can indroduce a new idea to optimize exchange plan:
>  # First introduce a new RDD, it consists of parent rdds that has the same 
> partition size. The ith parttition corresponds to ith partition of each 
> parent rdd.
>  # Then push the required distribution to union plan's children. If any child 
> output partitioning matches the required distribution , we can reduce this 
> child shuffle operation.
> After doing these, the physical plan does not contain exchange shuffle plan
> {code:bash}
> AdaptiveSparkPlan isFinalPlan=false
> +- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
> specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
> id_row_number#0, id#7, name#8 DESC NULLS LAST
>   +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
>      +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
> ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 
> 200)
>         :- FileScan csv spark_catalog.default.table1id#7,name#8
>         +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Description: 
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan does not contain exchange shuffle plan
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}
 

 

  was:
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedprece

[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Description: 
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}
 

 

  was:
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}

The physical plan is 
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
{code:bash}
daptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_

[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Description: 
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}

The physical plan is 
{code:bash}
Unable to find source-code formatter for language: shell. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlAdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
{code:bash}
daptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}


 

 

  was:
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:bash}
Unable to find source-code formatter for language: shell. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlAdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can ind

[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Description: 
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}

The physical plan is 
{code:bash}
AdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
{code:bash}
daptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}


 

 

  was:
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}

The physical plan is 
{code:bash}
Unable to find source-code formatter for language: shell. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlAdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches 

[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Description: 
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:bash}
Unable to find source-code formatter for language: shell. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlAdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
{code:bash}
daptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}


 

 

  was:
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:shell}
Unable to find source-code formatter for language: shell. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlAdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can ind

[jira] [Updated] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-42935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeff Min updated SPARK-42935:
-
Description: 
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
{code:sql}
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);{code}
The physical plan is 
{code:shell}
Unable to find source-code formatter for language: shell. Available languages 
are: actionscript, ada, applescript, bash, c, c#, c++, cpp, css, erlang, go, 
groovy, haskell, html, java, javascript, js, json, lua, none, nyan, objc, perl, 
php, python, r, rainbow, ruby, scala, sh, sql, swift, visualbasic, xml, 
yamlAdaptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28, id#35, name#36 DESC NULLS LAST
  +- Sort id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST, false, 0
     +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, [plan_id=88]
        +- Union
           :- FileScan csv spark_catalog.default.table1id#35,name#36
           +- FileScan csv spark_catalog.default.table2id#37,name#38 {code}
 

Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
{code:shell}
daptiveSparkPlan isFinalPlan=false
+- Window row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0, id#7, name#8 DESC NULLS LAST
  +- Sort id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST, false, 0
     +- UnionZip ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None), hashpartitioning(id#7, 200)
        :- FileScan csv spark_catalog.default.table1id#7,name#8
        +- FileScan csv spark_catalog.default.table2id#9,name#10 {code}


 

 

  was:
Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);
The physical plan is
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28], [id#35], [name#36 DESC NULLS LAST]
   +- Sort [id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST], false, 0
      +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, 
[plan_id=88]
         +- Union
            :- FileScan csv spark_catalog.default.table1[id#35,name#36]
            +- FileScan csv spark_catalog.default.table2[id#37,name#38]
Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribut

[jira] [Created] (SPARK-42935) Optimze shuffle for union spark plan

2023-03-27 Thread Jeff Min (Jira)
Jeff Min created SPARK-42935:


 Summary: Optimze shuffle for union spark plan
 Key: SPARK-42935
 URL: https://issues.apache.org/jira/browse/SPARK-42935
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.5.0
Reporter: Jeff Min
 Fix For: 3.5.0


Union plan does not take full advantage of children plan output partitionings 
when output partitoning can't match parent plan's required distribution. For 
example, Table1 and table2 are all bucketed table with bucket column id and 
bucket number 100. We will do row_number window function after union the two 
tables.
create table table1 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table1 values(1, "s1");
insert into table1 values(2, "s2");
​
create table table2 (id int, name string) using csv CLUSTERED BY (id) INTO 100 
BUCKETS;
insert into table2 values(1, "s3");
​
set spark.sql.shuffle.partitions=100;
explain select *, row_number() over(partition by id order by name desc) 
id_row_number from (select * from table1 union all select * from table2);
The physical plan is
AdaptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(id#35, name#36 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#28], [id#35], [name#36 DESC NULLS LAST]
   +- Sort [id#35 ASC NULLS FIRST, name#36 DESC NULLS LAST], false, 0
      +- Exchange hashpartitioning(id#35, 100), ENSURE_REQUIREMENTS, 
[plan_id=88]
         +- Union
            :- FileScan csv spark_catalog.default.table1[id#35,name#36]
            +- FileScan csv spark_catalog.default.table2[id#37,name#38]
Although the two tables are bucketed by id column, there's still a exchange 
plan after union.The reason is that union plan's output partitioning is null.

We can indroduce a new idea to optimize exchange plan:
 # First introduce a new RDD, it consists of parent rdds that has the same 
partition size. The ith parttition corresponds to ith partition of each parent 
rdd.

 # Then push the required distribution to union plan's children. If any child 
output partitioning matches the required distribution , we can reduce this 
child shuffle operation.

After doing these, the physical plan is
daptiveSparkPlan isFinalPlan=false
+- Window [row_number() windowspecdefinition(id#7, name#8 DESC NULLS LAST, 
specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
id_row_number#0], [id#7], [name#8 DESC NULLS LAST]
   +- Sort [id#7 ASC NULLS FIRST, name#8 DESC NULLS LAST], false, 0
      +- UnionZip [ClusteredDistribution(ArrayBuffer(id#7),false,None), 
ClusteredDistribution(ArrayBuffer(id#9),false,None)], hashpartitioning(id#7, 
200)
         :- FileScan csv spark_catalog.default.table1[id#7,name#8]
         +- FileScan csv spark_catalog.default.table2[id#9,name#10]
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org