Hey all,

I have one large table, A, and two medium sized tables, B & C, that I'm
trying to complete a join on efficiently. The result is multiplicative on A
join B, so I'd like to avoid shuffling that result. For this example, let's
just assume each table has three columns, x, y, z. The below is all being
tested on Spark 2.4.5 locally.

I'd like to perform the following join:
A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
This outputs the following physical plan:
== Physical Plan ==
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- LocalTableScan [x#52, y#53, z#54]


I may be misremembering, but in the past I thought you had the ability to
pre-partition each table by "x" and it would satisfy the requirements of
the join since it is already clustered by the key on both sides using the
same hash function (this assumes numPartitions lines up obviously). However
it seems like it will insert another exchange:

A.repartition($"x").join(B.repartition($"x"), Seq("x",
"y")).join(C.repartition($"x"), Seq("x", "z"))
*(6) Project [x#32, z#34, y#33, z#74, y#53]
+- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
   :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(x#32, z#34, 200)
   :     +- *(3) Project [x#32, y#33, z#34, z#74]
   :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
   :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
false, 0
   :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
   :           :     +- Exchange hashpartitioning(x#32, 200)
   :           :        +- LocalTableScan [x#32, y#33, z#34]
   :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
false, 0
   :              +- Exchange hashpartitioning(x#72, y#73, 200)
   :                 +- Exchange hashpartitioning(x#72, 200)
   :                    +- LocalTableScan [x#72, y#73, z#74]
   +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(x#52, z#54, 200)
         +- ReusedExchange [x#52, y#53, z#54], Exchange
hashpartitioning(x#32, 200).

Note, that using this "strategy" with groupBy("x", "y") works fine though I
assume that is because it doesn't have to consider the other side of the
join.

Did this used to work or am I simply confusing it with groupBy? Either way
- any thoughts on how I can avoid shuffling the bulk of the join result?

Thanks,
Pat

Reply via email to