GANHONGNAN created SPARK-48244: ---------------------------------- Summary: [improvement] Keep partitioning of Full Outer Join Key: SPARK-48244 URL: https://issues.apache.org/jira/browse/SPARK-48244 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.2.1 Reporter: GANHONGNAN Fix For: 3.2.1
h1. Demo SQL {code:java} select * from ( select coalesce(t1.a, t2.a) as a, coalesce(t1.b, t2.b) as b, t1.c, t2.d from t1 full out join t2 on t1.a = t2.a and t1.b = t2.b ) l join t3 on l.a = t3.a and l.b = t3.b {code} This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores historical data (e.g. business data before today), while {{t2}} stores incremental data (e.g. today's business orders). The inner subquery uses {{t2}} upsert {{{}t1{}}}and get full data until today. h1. Current Physical Plan !image-2024-05-12-23-15-58-367.png|width=509,height=340! h1. *Equivalent* Optimized Plan !image-2024-05-12-23-16-46-156.png! h2. Benefit avoid shuffling a large amount of data. h1. Proof of *Equivalence* Assume parallelism is P, * It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, because at this time there is no need for HashPartitioning the Join result again. |Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as l.b|hash(l.a, l.b) mod P| ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result partitioning satisfies next Join's requiremtn?| |match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}} |Because of shuffle caused by first full outer join, partitionId of (t1.a, t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to hash(t2.a, t2.b) Mod P)|Yes | |Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes| |Left miss match|Equal to {{(t2.a, t2.b)}} |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod P|Yes | h1. 实现方式 Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org