Hi Stuart, Yes that's the query plan but if you take a look at my screenshot it skips the first stage since the datasets are co-partitioned.
Thanks, Silvio ________________________________ From: Stuart White <stuart.whi...@gmail.com> Sent: Saturday, November 12, 2016 11:20:28 AM To: Silvio Fiorito Cc: user@spark.apache.org Subject: Re: Joining to a large, pre-sorted file Hi Silvio, Thanks very much for the response! I'm pretty new at reading explain plans, so maybe I'm misunderstanding what I'm seeing. Remember my goal is to sort master, write it out, later read it back in and have Spark "remember" that it's sorted, so I can do joins and Spark will not sort it again. Looking at the explain plan for the example job you provided, it looks to me like Spark is re-sorted master after reading it back in. See the attachment for the Sort step I'm referring to. Am I misunderstanding the explain plan? Thanks again! On Sat, Nov 12, 2016 at 9:34 AM, Silvio Fiorito <silvio.fior...@granturing.com<mailto:silvio.fior...@granturing.com>> wrote: Hi Stuart, You don’t need the sortBy or sortWithinPartitions. https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/594325853373464/879901972425732/6861830365114179/latest.html This is what the job should look like: [cid:image001.png@01D23CD0.56997F50] On 11/12/16, 8:40 AM, "Stuart White" <stuart.whi...@gmail.com<mailto:stuart.whi...@gmail.com>> wrote: Thanks for the reply. I understand that I need to use bucketBy() to write my master file, but I still can't seem to make it work as expected. Here's a code example for how I'm writing my master file: Range(0, 1000000) .map(i => (i, s"master_$i")) .toDF("key", "value") .write .format("json") .bucketBy(3, "key") .sortBy("key") .saveAsTable("master") And here's how I'm reading it later and attempting to join to a transaction dataset: val master = spark .read .format("json") .json("spark-warehouse/master") .cache val transaction = Range(0, 1000000) .map(i => (i, s"transaction_$i")) .toDF("key", "value") .repartition(3, 'key) .sortWithinPartitions('key) .cache val results = master.join(transaction, "key") When I call results.explain(), I see that it is sorting both datasets before sending them through SortMergeJoin. == Physical Plan == *Project [key#0L, value#1, value#53] +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner :- *Sort [key#0L ASC], false, 0 : +- Exchange hashpartitioning(key#0L, 200) : +- *Filter isnotnull(key#0L) : +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)] : : +- InMemoryRelation [key#0L, value#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : : +- *Scan json [key#0L,value#1] Format: JSON, InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint,value:string> +- *Sort [cast(key#52 as bigint) ASC], false, 0 +- Exchange hashpartitioning(cast(key#52 as bigint), 200) +- InMemoryTableScan [key#52, value#53] : +- InMemoryRelation [key#52, value#53], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- *Sort [key#52 ASC], false, 0 : : +- Exchange hashpartitioning(key#52, 3) : : +- LocalTableScan [key#52, value#53] Here are my thoughts: 1. I think I'm probably reading the master file back into memory incorrectly. I think maybe I should be reading it as a Hive table rather than just a plain json file, but I can't seem to figure out how to do that. 2. I don't understand exactly when partition counts/bucket counts are important. For example, in this example, at the time it's written, master has 1 partition and is written into 3 buckets, resulting in 3 files being written out. Later when I generated my transaction dataset, I repartitioned it into 3 partitions. Was that the correct thing to do (3 transaction partitions == 3 master buckets)? Or should I have repartitioned master into 3 partitions before writing (resulting in 9 files if I still create 3 buckets)? Basically, I don't understand how partitions and buckets should be handled. So, I feel like I'm close, but there are a few ways in which I don't understand how these pieces are supposed to fit together. If this is explained somewhere, with a simple example, that would be great.