GANHONGNAN created SPARK-48244:
----------------------------------

             Summary: [improvement] Keep partitioning of Full Outer Join
                 Key: SPARK-48244
                 URL: https://issues.apache.org/jira/browse/SPARK-48244
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.2.1
            Reporter: GANHONGNAN
             Fix For: 3.2.1


h1. Demo SQL
{code:java}
select *
from (
    select 
        coalesce(t1.a, t2.a) as a,
        coalesce(t1.b, t2.b) as b,
        t1.c, t2.d
    from t1 full out join t2
        on t1.a = t2.a and t1.b = t2.b
    ) l join t3
    on l.a = t3.a and l.b = t3.b {code}
 
This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores 
historical data (e.g. business data before today), while {{t2}} stores 
incremental data (e.g. today's business orders). The inner subquery uses {{t2}} 
upsert {{{}t1{}}}and get full data until today.
h1. Current Physical Plan
!image-2024-05-12-23-15-58-367.png|width=509,height=340!
 
h1. *Equivalent* Optimized Plan

!image-2024-05-12-23-16-46-156.png!
h2. Benefit
avoid shuffling a large amount of data.
 
 
h1. Proof of *Equivalence*
Assume parallelism is P, * 
It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, 
xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, 
because at this time there is no need for HashPartitioning the Join result 
again.
 
|Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as 
l.b|hash(l.a, l.b) mod P| ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does 
FullOut Join result partitioning satisfies next Join's requiremtn?|
|match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}
 |Because of shuffle caused by first full outer join, partitionId of (t1.a, 
t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to 
hash(t2.a, t2.b) Mod P)|Yes
 |
|Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod 
P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, 
t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes|
|Left miss match|Equal to {{(t2.a, t2.b)}}
 |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full 
outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod 
P|Yes
 |
 
 
h1. 实现方式
Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning
 
!https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to