Hey Terry,

Thanks for the response! I'm not sure that it ends up working though - the
bucketing still seems to require the exchange before the join. Both tables
below are saved bucketed by "x":
*(5) Project [x#29, y#30, z#31, z#37]
+- *(5) SortMergeJoin [x#29, y#30], [x#35, y#36], Inner
   :- *(2) Sort [x#29 ASC NULLS FIRST, y#30 ASC NULLS FIRST], false, 0
*   :  +- Exchange hashpartitioning(x#29, y#30, 200)*
   :     +- *(1) Project [x#29, y#30, z#31]
   :        +- *(1) Filter (isnotnull(x#29) && isnotnull(y#30))
   :           +- *(1) FileScan parquet default.ax[x#29,y#30,z#31] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/ax],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200
   +- *(4) Sort [x#35 ASC NULLS FIRST, y#36 ASC NULLS FIRST], false, 0
*      +- Exchange hashpartitioning(x#35, y#36, 200)*
         +- *(3) Project [x#35, y#36, z#37]
            +- *(3) Filter (isnotnull(x#35) && isnotnull(y#36))
               +- *(3) FileScan parquet default.bx[x#35,y#36,z#37] Batched:
true, Format: Parquet, Location:
InMemoryFileIndex[file:/home/pwoody/tm/spark-2.4.5-bin-hadoop2.7/spark-warehouse/bx],
PartitionFilters: [], PushedFilters: [IsNotNull(x), IsNotNull(y)],
ReadSchema: struct<x:int,y:int,z:int>, SelectedBucketsCount: 200 out of 200

Best,
Pat



On Sun, May 31, 2020 at 3:15 PM Terry Kim <yumin...@gmail.com> wrote:

> You can use bucketBy to avoid shuffling in your scenario. This test suite
> has some examples:
> https://github.com/apache/spark/blob/45cf5e99503b00a6bd83ea94d6d92761db1a00ab/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala#L343
>
> Thanks,
> Terry
>
> On Sun, May 31, 2020 at 7:43 AM Patrick Woody <patrick.woo...@gmail.com>
> wrote:
>
>> Hey all,
>>
>> I have one large table, A, and two medium sized tables, B & C, that I'm
>> trying to complete a join on efficiently. The result is multiplicative on A
>> join B, so I'd like to avoid shuffling that result. For this example, let's
>> just assume each table has three columns, x, y, z. The below is all being
>> tested on Spark 2.4.5 locally.
>>
>> I'd like to perform the following join:
>> A.join(B, Seq("x", "y")).join(C, Seq("x", "z"))
>> This outputs the following physical plan:
>> == Physical Plan ==
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>    :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(x#32, z#34, 200)
>>    :     +- *(3) Project [x#32, y#33, z#34, z#74]
>>    :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>    :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>    :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>    :           :     +- LocalTableScan [x#32, y#33, z#34]
>>    :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
>>    :              +- Exchange hashpartitioning(x#72, y#73, 200)
>>    :                 +- LocalTableScan [x#72, y#73, z#74]
>>    +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>       +- Exchange hashpartitioning(x#52, z#54, 200)
>>          +- LocalTableScan [x#52, y#53, z#54]
>>
>>
>> I may be misremembering, but in the past I thought you had the ability to
>> pre-partition each table by "x" and it would satisfy the requirements of
>> the join since it is already clustered by the key on both sides using the
>> same hash function (this assumes numPartitions lines up obviously). However
>> it seems like it will insert another exchange:
>>
>> A.repartition($"x").join(B.repartition($"x"), Seq("x",
>> "y")).join(C.repartition($"x"), Seq("x", "z"))
>> *(6) Project [x#32, z#34, y#33, z#74, y#53]
>> +- *(6) SortMergeJoin [x#32, z#34], [x#52, z#54], Inner
>>    :- *(4) Sort [x#32 ASC NULLS FIRST, z#34 ASC NULLS FIRST], false, 0
>>    :  +- Exchange hashpartitioning(x#32, z#34, 200)
>>    :     +- *(3) Project [x#32, y#33, z#34, z#74]
>>    :        +- *(3) SortMergeJoin [x#32, y#33], [x#72, y#73], Inner
>>    :           :- *(1) Sort [x#32 ASC NULLS FIRST, y#33 ASC NULLS FIRST],
>> false, 0
>>    :           :  +- Exchange hashpartitioning(x#32, y#33, 200)
>>    :           :     +- Exchange hashpartitioning(x#32, 200)
>>    :           :        +- LocalTableScan [x#32, y#33, z#34]
>>    :           +- *(2) Sort [x#72 ASC NULLS FIRST, y#73 ASC NULLS FIRST],
>> false, 0
>>    :              +- Exchange hashpartitioning(x#72, y#73, 200)
>>    :                 +- Exchange hashpartitioning(x#72, 200)
>>    :                    +- LocalTableScan [x#72, y#73, z#74]
>>    +- *(5) Sort [x#52 ASC NULLS FIRST, z#54 ASC NULLS FIRST], false, 0
>>       +- Exchange hashpartitioning(x#52, z#54, 200)
>>          +- ReusedExchange [x#52, y#53, z#54], Exchange
>> hashpartitioning(x#32, 200).
>>
>> Note, that using this "strategy" with groupBy("x", "y") works fine though
>> I assume that is because it doesn't have to consider the other side of the
>> join.
>>
>> Did this used to work or am I simply confusing it with groupBy? Either
>> way - any thoughts on how I can avoid shuffling the bulk of the join result?
>>
>> Thanks,
>> Pat
>>
>>
>>
>>
>>

Reply via email to