It seems that the number of files could possibly get out of hand using this
approach.

For example, in the job that buckets and writes master, assuming we use the
default number of shuffle partitions (200), and assuming that in the next
job (the job where we join to transaction), we're also going to want to use
200 partitions, that means master would be written to disk in 40,000 files
(200 partitions, each writing 200 bucket files).  Am I mistaken?

Is there some way to avoid this explosion of the number of files?  Or is
this just an unavoidable side-effect of Spark's bucketing implementation?

Thanks again!

On Sun, Nov 13, 2016 at 9:24 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Hi Stuart,
>
> Yes that's the query plan but if you take a look at my screenshot it skips
> the first stage since the datasets are co-partitioned.
>
> Thanks,
> Silvio
> ------------------------------
> *From:* Stuart White <stuart.whi...@gmail.com>
> *Sent:* Saturday, November 12, 2016 11:20:28 AM
> *To:* Silvio Fiorito
> *Cc:* user@spark.apache.org
> *Subject:* Re: Joining to a large, pre-sorted file
>
> Hi Silvio,
>
> Thanks very much for the response!
>
> I'm pretty new at reading explain plans, so maybe I'm misunderstanding
> what I'm seeing.
>
> Remember my goal is to sort master, write it out, later read it back in
> and have Spark "remember" that it's sorted, so I can do joins and Spark
> will not sort it again.
>
> Looking at the explain plan for the example job you provided, it looks to
> me like Spark is re-sorted master after reading it back in.  See the
> attachment for the Sort step I'm referring to.
>
> Am I misunderstanding the explain plan?
>
> Thanks again!
>
> On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>> Hi Stuart,
>>
>>
>>
>> You don’t need the sortBy or sortWithinPartitions.
>>
>>
>>
>> https://databricks-prod-cloudfront.cloud.databricks.com/
>> public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/8799
>> 01972425732/6861830365114179/latest.html
>>
>>
>>
>>
>>
>> This is what the job should look like:
>>
>>
>>
>> On 11/12/16, 8:40 AM, "Stuart White" <stuart.whi...@gmail.com> wrote:
>>
>>
>>
>>     Thanks for the reply.
>>
>>
>>
>>     I understand that I need to use bucketBy() to write my master file,
>>
>>     but I still can't seem to make it work as expected.  Here's a code
>>
>>     example for how I'm writing my master file:
>>
>>
>>
>>     Range(0, 1000000)
>>
>>       .map(i => (i, s"master_$i"))
>>
>>       .toDF("key", "value")
>>
>>       .write
>>
>>       .format("json")
>>
>>       .bucketBy(3, "key")
>>
>>       .sortBy("key")
>>
>>       .saveAsTable("master")
>>
>>
>>
>>     And here's how I'm reading it later and attempting to join to a
>>
>>     transaction dataset:
>>
>>
>>
>>     val master = spark
>>
>>       .read
>>
>>       .format("json")
>>
>>       .json("spark-warehouse/master")
>>
>>       .cache
>>
>>
>>
>>     val transaction = Range(0, 1000000)
>>
>>       .map(i => (i, s"transaction_$i"))
>>
>>       .toDF("key", "value")
>>
>>       .repartition(3, 'key)
>>
>>       .sortWithinPartitions('key)
>>
>>       .cache
>>
>>
>>
>>     val results = master.join(transaction, "key")
>>
>>
>>
>>     When I call results.explain(), I see that it is sorting both datasets
>>
>>     before sending them through SortMergeJoin.
>>
>>
>>
>>     == Physical Plan ==
>>
>>     *Project [key#0L, value#1, value#53]
>>
>>     +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner
>>
>>       :- *Sort [key#0L ASC], false, 0
>>
>>        :  +- Exchange hashpartitioning(key#0L, 200)
>>
>>        :     +- *Filter isnotnull(key#0L)
>>
>>        :        +- InMemoryTableScan [key#0L, value#1],
>> [isnotnull(key#0L)]
>>
>>        :           :  +- InMemoryRelation [key#0L, value#1], true, 10000,
>>
>>     StorageLevel(disk, memory, deserialized, 1 replicas)
>>
>>        :           :     :  +- *Scan json [key#0L,value#1] Format: JSON,
>>
>>     InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [],
>>
>>     PushedFilters: [], ReadSchema: struct<key:bigint,value:string>
>>
>>        +- *Sort [cast(key#52 as bigint) ASC], false, 0
>>
>>           +- Exchange hashpartitioning(cast(key#52 as bigint), 200)
>>
>>              +- InMemoryTableScan [key#52, value#53]
>>
>>                 :  +- InMemoryRelation [key#52, value#53], true, 10000,
>>
>>     StorageLevel(disk, memory, deserialized, 1 replicas)
>>
>>                 :     :  +- *Sort [key#52 ASC], false, 0
>>
>>                 :     :     +- Exchange hashpartitioning(key#52, 3)
>>
>>                 :     :        +- LocalTableScan [key#52, value#53]
>>
>>
>>
>>     Here are my thoughts:
>>
>>     1. I think I'm probably reading the master file back into memory
>>
>>     incorrectly.  I think maybe I should be reading it as a Hive table
>>
>>     rather than just a plain json file, but I can't seem to figure out how
>>
>>     to do that.
>>
>>     2. I don't understand exactly when partition counts/bucket counts are
>>
>>     important.  For example, in this example, at the time it's written,
>>
>>     master has 1 partition and is written into 3 buckets, resulting in 3
>>
>>     files being written out.  Later when I generated my transaction
>>
>>     dataset, I repartitioned it into 3 partitions.  Was that the correct
>>
>>     thing to do (3 transaction partitions == 3 master buckets)?  Or should
>>
>>     I have repartitioned master into 3 partitions before writing
>>
>>     (resulting in 9 files if I still create 3 buckets)?  Basically, I
>>
>>     don't understand how partitions and buckets should be handled.
>>
>>
>>
>>     So, I feel like I'm close, but there are a few ways in which I don't
>>
>>     understand how these pieces are supposed to fit together.  If this is
>>
>>     explained somewhere, with a simple example, that would be great.
>>
>>
>>
>
>

Reply via email to