Hi Stuart,


You don’t need the sortBy or sortWithinPartitions.



https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html





This is what the job should look like:



[cid:image001.png@01D23CD0.56997F50]

On 11/12/16, 8:40 AM, "Stuart White" <stuart.whi...@gmail.com> wrote:



    Thanks for the reply.



    I understand that I need to use bucketBy() to write my master file,

    but I still can't seem to make it work as expected.  Here's a code

    example for how I'm writing my master file:



    Range(0, 1000000)

      .map(i => (i, s"master_$i"))

      .toDF("key", "value")

      .write

      .format("json")

      .bucketBy(3, "key")

      .sortBy("key")

      .saveAsTable("master")



    And here's how I'm reading it later and attempting to join to a

    transaction dataset:



    val master = spark

      .read

      .format("json")

      .json("spark-warehouse/master")

      .cache



    val transaction = Range(0, 1000000)

      .map(i => (i, s"transaction_$i"))

      .toDF("key", "value")

      .repartition(3, 'key)

      .sortWithinPartitions('key)

      .cache



    val results = master.join(transaction, "key")



    When I call results.explain(), I see that it is sorting both datasets

    before sending them through SortMergeJoin.



    == Physical Plan ==

    *Project [key#0L, value#1, value#53]

    +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner

      :- *Sort [key#0L ASC], false, 0

       :  +- Exchange hashpartitioning(key#0L, 200)

       :     +- *Filter isnotnull(key#0L)

       :        +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)]

       :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

       :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,

    InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],

    PushedFilters: [], ReadSchema: struct<key:bigint,value:string>

       +- *Sort [cast(key#52 as bigint) ASC], false, 0

          +- Exchange hashpartitioning(cast(key#52 as bigint), 200)

             +- InMemoryTableScan [key#52, value#53]

                :  +- InMemoryRelation [key#52, value#53], true, 10000,

    StorageLevel(disk, memory, deserialized, 1 replicas)

                :     :  +- *Sort [key#52 ASC], false, 0

                :     :     +- Exchange hashpartitioning(key#52, 3)

                :     :        +- LocalTableScan [key#52, value#53]



    Here are my thoughts:

    1. I think I'm probably reading the master file back into memory

    incorrectly.  I think maybe I should be reading it as a Hive table

    rather than just a plain json file, but I can't seem to figure out how

    to do that.

    2. I don't understand exactly when partition counts/bucket counts are

    important.  For example, in this example, at the time it's written,

    master has 1 partition and is written into 3 buckets, resulting in 3

    files being written out.  Later when I generated my transaction

    dataset, I repartitioned it into 3 partitions.  Was that the correct

    thing to do (3 transaction partitions == 3 master buckets)?  Or should

    I have repartitioned master into 3 partitions before writing

    (resulting in 9 files if I still create 3 buckets)?  Basically, I

    don't understand how partitions and buckets should be handled.



    So, I feel like I'm close, but there are a few ways in which I don't

    understand how these pieces are supposed to fit together.  If this is

    explained somewhere, with a simple example, that would be great.


Reply via email to