[ 
https://issues.apache.org/jira/browse/SPARK-48244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GANHONGNAN updated SPARK-48244:
-------------------------------
    Description: 
h1. Demo SQL
{code:java}
select *
from (
    select 
        coalesce(t1.a, t2.a) as a,
        coalesce(t1.b, t2.b) as b,
        t1.c, t2.d
    from t1 full out join t2
        on t1.a = t2.a and t1.b = t2.b
    ) l join t3
    on l.a = t3.a and l.b = t3.b {code}
 
This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores 
historical data (e.g. business data before today), while {{t2}} stores 
incremental data (e.g. today's business orders). The inner subquery uses {{t2}} 
upsert {{{}t1{}}}and get full data until today.
h1. Current Physical Plan

!image-2024-05-12-23-23-38-998.png|width=398,height=319!
 
h1. *Equivalent* Optimized Plan

!image-2024-05-12-23-24-00-375.png|width=314,height=278!
h2. Benefit

avoid shuffling a large amount of data.
 
 
h1. Proof of *Equivalence*

Assume parallelism is P, * 
It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, 
xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, 
because at this time there is no need for HashPartitioning the Join result 
again.
 
|Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as 
l.b|hash(l.a, l.b) mod P| * 
 ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result 
partitioning satisfies next Join's requiremtn?|
|match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}
 |Because of shuffle caused by first full outer join, partitionId of (t1.a, 
t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to 
hash(t2.a, t2.b) Mod P)|Yes
 |
|Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod 
P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, 
t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes|
|Left miss match|Equal to {{(t2.a, t2.b)}}
 |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full 
outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod 
P|Yes
 |

 
 
h1. 实现方式

Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning
 
!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356!

  was:
h1. Demo SQL
{code:java}
select *
from (
    select 
        coalesce(t1.a, t2.a) as a,
        coalesce(t1.b, t2.b) as b,
        t1.c, t2.d
    from t1 full out join t2
        on t1.a = t2.a and t1.b = t2.b
    ) l join t3
    on l.a = t3.a and l.b = t3.b {code}
 
This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores 
historical data (e.g. business data before today), while {{t2}} stores 
incremental data (e.g. today's business orders). The inner subquery uses {{t2}} 
upsert {{{}t1{}}}and get full data until today.
h1. Current Physical Plan
!image-2024-05-12-23-15-58-367.png|width=509,height=340!
 
h1. *Equivalent* Optimized Plan

!image-2024-05-12-23-16-46-156.png!
h2. Benefit
avoid shuffling a large amount of data.
 
 
h1. Proof of *Equivalence*
Assume parallelism is P, * 
It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, 
xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, 
because at this time there is no need for HashPartitioning the Join result 
again.
 
|Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as 
l.b|hash(l.a, l.b) mod P| ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does 
FullOut Join result partitioning satisfies next Join's requiremtn?|
|match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}
 |Because of shuffle caused by first full outer join, partitionId of (t1.a, 
t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to 
hash(t2.a, t2.b) Mod P)|Yes
 |
|Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod 
P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, 
t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes|
|Left miss match|Equal to {{(t2.a, t2.b)}}
 |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full 
outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod 
P|Yes
 |
 
 
h1. 实现方式
Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning
 
!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356!

        Summary: [improvement]Specify outputPartitioning of Full Outer Join  
(was: [improvement] Keep partitioning of Full Outer Join)

> [improvement]Specify outputPartitioning of Full Outer Join
> ----------------------------------------------------------
>
>                 Key: SPARK-48244
>                 URL: https://issues.apache.org/jira/browse/SPARK-48244
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: GANHONGNAN
>            Priority: Major
>             Fix For: 3.2.1
>
>
> h1. Demo SQL
> {code:java}
> select *
> from (
>     select 
>         coalesce(t1.a, t2.a) as a,
>         coalesce(t1.b, t2.b) as b,
>         t1.c, t2.d
>     from t1 full out join t2
>         on t1.a = t2.a and t1.b = t2.b
>     ) l join t3
>     on l.a = t3.a and l.b = t3.b {code}
>  
> This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores 
> historical data (e.g. business data before today), while {{t2}} stores 
> incremental data (e.g. today's business orders). The inner subquery uses 
> {{t2}} upsert {{{}t1{}}}and get full data until today.
> h1. Current Physical Plan
> !image-2024-05-12-23-23-38-998.png|width=398,height=319!
>  
> h1. *Equivalent* Optimized Plan
> !image-2024-05-12-23-24-00-375.png|width=314,height=278!
> h2. Benefit
> avoid shuffling a large amount of data.
>  
>  
> h1. Proof of *Equivalence*
> Assume parallelism is P, * 
> It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, 
> xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, 
> because at this time there is no need for HashPartitioning the Join result 
> again.
>  
> |Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as 
> l.b|hash(l.a, l.b) mod P| * 
>  ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result 
> partitioning satisfies next Join's requiremtn?|
> |match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}
>  |Because of shuffle caused by first full outer join, partitionId of (t1.a, 
> t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to 
> hash(t2.a, t2.b) Mod P)|Yes
>  |
> |Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod 
> P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, 
> t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes|
> |Left miss match|Equal to {{(t2.a, t2.b)}}
>  |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full 
> outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod 
> P|Yes
>  |
>  
>  
> h1. 实现方式
> Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning
>  
> !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to