[ https://issues.apache.org/jira/browse/SPARK-48244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
GANHONGNAN updated SPARK-48244: ------------------------------- Description: h1. Demo SQL {code:java} select * from ( select coalesce(t1.a, t2.a) as a, coalesce(t1.b, t2.b) as b, t1.c, t2.d from t1 full out join t2 on t1.a = t2.a and t1.b = t2.b ) l join t3 on l.a = t3.a and l.b = t3.b {code} This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores historical data (e.g. business data before today), while {{t2}} stores incremental data (e.g. today's business orders). The inner subquery uses {{t2}} upsert {{{}t1{}}}and get full data until today. h1. Current Physical Plan !image-2024-05-12-23-23-38-998.png|width=398,height=319! h1. *Equivalent* Optimized Plan !image-2024-05-12-23-24-00-375.png|width=314,height=278! h2. Benefit avoid shuffling a large amount of data. h1. Proof of *Equivalence* Assume parallelism is P, * It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, because at this time there is no need for HashPartitioning the Join result again. |Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as l.b|hash(l.a, l.b) mod P| * ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result partitioning satisfies next Join's requiremtn?| |match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}} |Because of shuffle caused by first full outer join, partitionId of (t1.a, t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to hash(t2.a, t2.b) Mod P)|Yes | |Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes| |Left miss match|Equal to {{(t2.a, t2.b)}} |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod P|Yes | h1. 实现方式 Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356! was: h1. Demo SQL {code:java} select * from ( select coalesce(t1.a, t2.a) as a, coalesce(t1.b, t2.b) as b, t1.c, t2.d from t1 full out join t2 on t1.a = t2.a and t1.b = t2.b ) l join t3 on l.a = t3.a and l.b = t3.b {code} This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores historical data (e.g. business data before today), while {{t2}} stores incremental data (e.g. today's business orders). The inner subquery uses {{t2}} upsert {{{}t1{}}}and get full data until today. h1. Current Physical Plan !image-2024-05-12-23-15-58-367.png|width=509,height=340! h1. *Equivalent* Optimized Plan !image-2024-05-12-23-16-46-156.png! h2. Benefit avoid shuffling a large amount of data. h1. Proof of *Equivalence* Assume parallelism is P, * It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, because at this time there is no need for HashPartitioning the Join result again. |Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as l.b|hash(l.a, l.b) mod P| ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result partitioning satisfies next Join's requiremtn?| |match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}} |Because of shuffle caused by first full outer join, partitionId of (t1.a, t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to hash(t2.a, t2.b) Mod P)|Yes | |Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes| |Left miss match|Equal to {{(t2.a, t2.b)}} |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod P|Yes | h1. 实现方式 Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356! Summary: [improvement]Specify outputPartitioning of Full Outer Join (was: [improvement] Keep partitioning of Full Outer Join) > [improvement]Specify outputPartitioning of Full Outer Join > ---------------------------------------------------------- > > Key: SPARK-48244 > URL: https://issues.apache.org/jira/browse/SPARK-48244 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.2.1 > Reporter: GANHONGNAN > Priority: Major > Fix For: 3.2.1 > > > h1. Demo SQL > {code:java} > select * > from ( > select > coalesce(t1.a, t2.a) as a, > coalesce(t1.b, t2.b) as b, > t1.c, t2.d > from t1 full out join t2 > on t1.a = t2.a and t1.b = t2.b > ) l join t3 > on l.a = t3.a and l.b = t3.b {code} > > This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores > historical data (e.g. business data before today), while {{t2}} stores > incremental data (e.g. today's business orders). The inner subquery uses > {{t2}} upsert {{{}t1{}}}and get full data until today. > h1. Current Physical Plan > !image-2024-05-12-23-23-38-998.png|width=398,height=319! > > h1. *Equivalent* Optimized Plan > !image-2024-05-12-23-24-00-375.png|width=314,height=278! > h2. Benefit > avoid shuffling a large amount of data. > > > h1. Proof of *Equivalence* > Assume parallelism is P, * > It is necessary to prove that the RDD PartitionId where the data (l.a, l.b, > xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}}, > because at this time there is no need for HashPartitioning the Join result > again. > > |Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as > l.b|hash(l.a, l.b) mod P| * > ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result > partitioning satisfies next Join's requiremtn?| > |match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}} > |Because of shuffle caused by first full outer join, partitionId of (t1.a, > t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to > hash(t2.a, t2.b) Mod P)|Yes > | > |Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod > P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a, > t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes| > |Left miss match|Equal to {{(t2.a, t2.b)}} > |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full > outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod > P|Yes > | > > > h1. 实现方式 > Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning > > !https://bytedance.larkoffice.com/space/api/box/stream/download/asynccode/?code=YzQ1YTFkOTQyOTc1OWUzNjRiNDFiNjZkMzAzYzkyY2FfMHBJYU40T1Nua1gzbTlOeG1VOW1NaEdzY3RDSzJJa1RfVG9rZW46QnV3MmJiRkhnb3dSMVR4VHJSRWNCTmxGbnl2XzE3MTU1MjcwNjY6MTcxNTUzMDY2Nl9WNA|width=754,height=356! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org