You want to look at the bucketBy option when you save the master file out. That 
way it will be pre-partitioned by the join column, eliminating the shuffle on 
the larger file.



From: Stuart White <stuart.whi...@gmail.com>
Date: Thursday, November 10, 2016 at 8:39 PM
To: Jörn Franke <jornfra...@gmail.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Joining to a large, pre-sorted file

Yes.  In my original question, when I said I wanted to pre-sort the master 
file, I should have said "pre-sort and pre-partition the file".
Years ago, I did this with Hadoop MapReduce.  I pre-sorted/partitioned the 
master file into N partitions.  Then, when a transaction file would arrive, I 
would sort/partition the transaction file on the join key into N partitions.  
Then I could perform what was called a mapside join.
Basically, I want to do the same thing in Spark.  And it looks like all the 
pieces to accomplish this exist, but I can't figure out how to connect all the 
dots.  It seems like this functionality is pretty new so there aren't a lot of 
examples available.

On Thu, Nov 10, 2016 at 7:33 PM, Jörn Franke 
<jornfra...@gmail.com<mailto:jornfra...@gmail.com>> wrote:
Can you split the files beforehand in several files (e.g. By the column you do 
the join on?) ?

On 10 Nov 2016, at 23:45, Stuart White 
<stuart.whi...@gmail.com<mailto:stuart.whi...@gmail.com>> wrote:
I have a large "master" file (~700m records) that I frequently join smaller 
"transaction" files to.  (The transaction files have 10's of millions of 
records, so too large for a broadcast join).
I would like to pre-sort the master file, write it to disk, and then, in 
subsequent jobs, read the file off disk and join to it without having to 
re-sort it.  I'm using Spark SQL, and my understanding is that the Spark 
Catalyst Optimizer will choose an optimal join algorithm if it is aware that 
the datasets are sorted.  So, the trick is to make the optimizer aware that the 
master file is already sorted.
I think SPARK-12394<https://issues.apache.org/jira/browse/SPARK-12394> provides 
this functionality, but I can't seem to put the pieces together for how to use 
it.
Could someone possibly provide a simple example of how to:

  1.  Sort a master file by a key column and write it to disk in such a way 
that its "sorted-ness" is preserved.
  2.  In a later job, read a transaction file, sort/partition it as necessary.  
Read the master file, preserving its sorted-ness.  Join the two DataFrames in 
such a way that the master rows are not sorted again.
Thanks!

Reply via email to