Thanks for the reply. I understand that I need to use bucketBy() to write my master file, but I still can't seem to make it work as expected. Here's a code example for how I'm writing my master file:
Range(0, 1000000) .map(i => (i, s"master_$i")) .toDF("key", "value") .write .format("json") .bucketBy(3, "key") .sortBy("key") .saveAsTable("master") And here's how I'm reading it later and attempting to join to a transaction dataset: val master = spark .read .format("json") .json("spark-warehouse/master") .cache val transaction = Range(0, 1000000) .map(i => (i, s"transaction_$i")) .toDF("key", "value") .repartition(3, 'key) .sortWithinPartitions('key) .cache val results = master.join(transaction, "key") When I call results.explain(), I see that it is sorting both datasets before sending them through SortMergeJoin. == Physical Plan == *Project [key#0L, value#1, value#53] +- *SortMergeJoin [key#0L], [cast(key#52 as bigint)], Inner :- *Sort [key#0L ASC], false, 0 : +- Exchange hashpartitioning(key#0L, 200) : +- *Filter isnotnull(key#0L) : +- InMemoryTableScan [key#0L, value#1], [isnotnull(key#0L)] : : +- InMemoryRelation [key#0L, value#1], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : : +- *Scan json [key#0L,value#1] Format: JSON, InputPaths: file:/work/spark-warehouse/master, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<key:bigint,value:string> +- *Sort [cast(key#52 as bigint) ASC], false, 0 +- Exchange hashpartitioning(cast(key#52 as bigint), 200) +- InMemoryTableScan [key#52, value#53] : +- InMemoryRelation [key#52, value#53], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) : : +- *Sort [key#52 ASC], false, 0 : : +- Exchange hashpartitioning(key#52, 3) : : +- LocalTableScan [key#52, value#53] Here are my thoughts: 1. I think I'm probably reading the master file back into memory incorrectly. I think maybe I should be reading it as a Hive table rather than just a plain json file, but I can't seem to figure out how to do that. 2. I don't understand exactly when partition counts/bucket counts are important. For example, in this example, at the time it's written, master has 1 partition and is written into 3 buckets, resulting in 3 files being written out. Later when I generated my transaction dataset, I repartitioned it into 3 partitions. Was that the correct thing to do (3 transaction partitions == 3 master buckets)? Or should I have repartitioned master into 3 partitions before writing (resulting in 9 files if I still create 3 buckets)? Basically, I don't understand how partitions and buckets should be handled. So, I feel like I'm close, but there are a few ways in which I don't understand how these pieces are supposed to fit together. If this is explained somewhere, with a simple example, that would be great. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org