[
https://issues.apache.org/jira/browse/SPARK-48244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun updated SPARK-48244:
----------------------------------
Target Version/s: (was: 3.2.1)
> Specify outputPartitioning of Full Outer Join to Avoid Shuffle
> --------------------------------------------------------------
>
> Key: SPARK-48244
> URL: https://issues.apache.org/jira/browse/SPARK-48244
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.2.1
> Reporter: GANHONGNAN
> Priority: Major
> Fix For: 3.2.1
>
>
> h1. Demo SQL
> {code:java}
> select *
> from (
> select
> coalesce(t1.a, t2.a) as a,
> coalesce(t1.b, t2.b) as b,
> t1.c, t2.d
> from t1 full out join t2
> on t1.a = t2.a and t1.b = t2.b
> ) l join t3
> on l.a = t3.a and l.b = t3.b {code}
>
> This pattern of sql is *frequently* used in ODS senario. Table {{t1}} stores
> historical data (e.g. business data before today), while {{t2}} stores
> incremental data (e.g. today's business orders). The inner subquery uses
> {{t2}} upsert {{{}t1{}}}and get full data until today.
> h1. Current Physical Plan
>
> {code:java}
> // code placeholder
> scan T1 Scan T2
> \ /
> Exchange(t1.a, t1.b) Exchange(t2.a, t2.b)
> \ /
> \ /
> Fullouter Join
> [t1.a, t1.b][t2.a, t2.b]
> |
> |
> Project(coalesce(t1.a, t2.a) as l.a,
> coalesce(t1.b, t2.b) as l.b)
> \
> \
> Exchange(l.a, l.b) Scan t3
> \ /
> \ Exchange(t3.a, t3.b)
> \ /
> Join
> [l.a, l.b][t3.a, t3.b]{code}
>
>
> h1. *Equivalent* Optimized Plan
>
> {code:java}
> // code placeholder
> scan T1 Scan T2
> \ /
> Exchange(t1.a, t1.b) Exchange(t2.a, t2.b)
> \ /
> \ /
> Fullouter Join
> [t1.a, t1.b][t2.a, t2.b]
> |
> |
> Project(coalesce(t1.a, t2.a) as l.a,
> coalesce(t1.b, t2.b) as l.b)
> \
> \
> _ _ _ _ _ _ _ _ _ _ _
> | Exchange(l.a, l.b) | Scan t3
> | **remove this ** |
> - - - - - - - - - - -
> \ /
> \ Exchange(t3.a, t3.b)
> \ /
> Join
> [l.a, l.b][t3.a, t3.b]
> {code}
>
>
> h2. Benefit
> avoid shuffling a large amount of data.
>
>
> h1. Proof of *Equivalence*
> Assume parallelism is P, *
> It is necessary to prove that the RDD PartitionId where the data (l.a, l.b,
> xxx) after Join is located is already equal to {{{}hash(l.a, l.b) mod P{}}},
> because at this time there is no need for HashPartitioning the Join result
> again.
>
> |Join match case|coalesce(t1.a, t2.a) as l.a, coalesce(t1.b, t2.b) as
> l.b|hash(l.a, l.b) mod P| *
> ** *RDD* *PartitionId of (l.a, l.b, xxx)*|Does FullOut Join result
> partitioning satisfies next Join's requiremtn?|
> |match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod P}}
> |Because of shuffle caused by first full outer join, partitionId of (t1.a,
> t1.b, xxx) and (t2.a, t2.b, xxx) equals to hash(t1.a, t1.b) mod P ( equals to
> hash(t2.a, t2.b) Mod P)|Yes
> |
> |Right miss match|Equal to {{(t1.a, t1.b)}}|Equal to {{hash(t1.a, t1.b) mod
> P}}|Because of shuffle caused by first full outer join, partitionId of (t1.a,
> t1.b, xxx) equals to hash(t1.a, t1.b) mod P|Yes|
> |Left miss match|Equal to {{(t2.a, t2.b)}}
> |Equal to {{hash(t2.a, t2.b) mod P}}|Because of shuffle caused by first full
> outer join, partitionId of (t2.a, t2.b, xxx) equals to hash(t2.a, t2.b) mod
> P|Yes
> |
>
>
> h1. 实现方式
> Change ShuffledJoin's outputPartitioning. Currently it is UnknownPartitioning
> `org.apache.spark.sql.execution.joins.ShuffledJoin#outputPartitioning`
>
> {code:java}
> override def outputPartitioning: Partitioning = joinType match {
> case _: InnerLike =>
> PartitioningCollection(Seq(left.outputPartitioning,
> right.outputPartitioning))
> case LeftOuter => left.outputPartitioning
> case RightOuter => right.outputPartitioning
> case FullOuter => UnknownPartitioning(left.outputPartitioning.numPartitions)
> case LeftExistence(_) => left.outputPartitioning
> case x =>
> throw new IllegalArgumentException(
> s"ShuffledJoin should not take $x as the JoinType")
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]