[ 
https://issues.apache.org/jira/browse/SPARK-48244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

GANHONGNAN updated SPARK-48244:
-------------------------------
    Summary: [improvement] Specify outputPartitioning of Full Outer Join to 
Avoid Shuffle  (was: [improvement]Specify outputPartitioning of Full Outer Join)

> [improvement] Specify outputPartitioning of Full Outer Join to Avoid Shuffle
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-48244
>                 URL: https://issues.apache.org/jira/browse/SPARK-48244
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.2.1
>            Reporter: GANHONGNAN
>            Priority: Major
>             Fix For: 3.2.1
>
>
> h1. Demo SQL
> {code:java}
> select *
> from (
>     select 
>         coalesce(t1.a, t2.a) as a,
>         coalesce(t1.b, t2.b) as b,
>         t1.c, t2.d
>     from t1 full out join t2
>         on t1.a = t2.a and t1.b = t2.b
>     ) l join t3
>     on l.a = t3.a and l.b = t3.b {code}
>  
> This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores 
> historical data (e.g. business data before today), while {{t2}} stores 
> incremental data (e.g. today's business orders). The inner subquery uses 
> {{t2}} upsert {{{}t1{}}}and get full data until today.
> h1. Current Physical Plan
>  
> {code:java}
> // code placeholder
> scan T1                      Scan T2
>     \                         /
> Exchange(t1.a, t1.b)     Exchange(t2.a, t2.b)
>       \                   /
>          \             /
>        Fullouter Join
>   [t1.a, t1.b][t2.a, t2.b]
>               |
>               |
>    Project(coalesce(t1.a, t2.a) as l.a,
>          coalesce(t1.b, t2.b) as l.b)
>                \
>                   \
>                  Exchange(l.a, l.b)      Scan t3
>                           \               /
>                            \         Exchange(t3.a, t3.b)
>                             \       /
>                                Join
>                          [l.a, l.b][t3.a, t3.b]{code}
>  
>  
> h1. *Equivalent* Optimized Plan
>  
> {code:java}
> // code placeholder
> scan T1                      Scan T2
>     \                         /
> Exchange(t1.a, t1.b)     Exchange(t2.a, t2.b)
>       \                   /
>          \             /
>        Fullouter Join
>   [t1.a, t1.b][t2.a, t2.b]
>               |
>               |
>    Project(coalesce(t1.a, t2.a) as l.a,
>          coalesce(t1.b, t2.b) as l.b)
>                \
>                   \
>                 _ _ _ _ _ _ _ _ _ _ _
>                | Exchange(l.a, l.b)  |     Scan t3
>                |  **remove this **   |
>                 - - - - - - - - - - -
>                           \               /
>                            \       Exchange(t3.a, t3.b)
>                             \        /
>                                Join
>                          [l.a, l.b][t3.a, t3.b]
>   {code}
>  
>  
> h2. Benefit
> avoid shuffling a large amount of data.
>  
>  
> h1. Proof of *Equivalence*
> Assume parallelism is P, * 
> It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, 
> xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, 
> because at this time there is no need for HashPartitioning the Join result 
> again.
>  
> |Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as 
> l.b|hash(l.a, l.b) mod P| * 
>  ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result 
> partitioning satisfies next Join's requiremtn?|
> |match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}
>  |Because of shuffle caused by first full outer join, partitionId of (t1.a, 
> t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to 
> hash(t2.a, t2.b) Mod P)|Yes
>  |
> |Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod 
> P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, 
> t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes|
> |Left miss match|Equal to {{(t2.a, t2.b)}}
>  |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full 
> outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod 
> P|Yes
>  |
>  
>  
> h1. 实现方式
> Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning
>  
> !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to