Hi Gourav,Thanks for your input. As mentioned previously, we've tried the 
broadcast. We've failed to broadcast 1.5GB...perhaps some tuning can help. We 
see CPU go up to 100%, and then workers die during the broadcast. I'm not sure 
if it's a good idea to broadcast that much, as spark's broadcast hint be 
default uses a threshold of just 10MB to decide whether to broadcast or not.
As for redis, we're not needing a seperate redis cluster or anything. We're 
using embedded redis on the driver that lives for the duration of the job. It's 
essentially a way to have some memory on the driver that can accomodate 1.5GB 
and allows access over the network. https://github.com/kstyrc/embedded-redis 
makes this trivial to do.
I don't know if this a 2011 way of solving this problem or not, but 
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs
 seems to suggest that a good approach to joining a huge dataset with one that 
can't be made smaller is using a database. We've gone by that, and it seems to 
be working. We've tried all the other recommendations (broadcast the dataframe 
as part of the join, explicitly broadcast a hashmap from the driver, register 
temp tables, etc.) - and nothing else has worked. The parquet dataframe doesn't 
have a partitioner when loaded, and any sort of operation requiring a network 
shuffle causes temp disk fill up. Within these constraints, the database 
approach turned out to be the only thing we could get working (without paying 
double / treble for nodes that have more disk space to hold the temp files).
Regards,Ashic.

From: gourav.sengu...@gmail.com
Date: Thu, 11 Aug 2016 21:52:07 +0100
Subject: Re: Spark join and large temp files
To: bteeu...@gmail.com
CC: user@spark.apache.org

The point is that if you have skewed data then one single reducer will finally 
take a very long time, and you do not need to try this even, just search in 
Google and skewed data is a known problem in joins even in SPARK.
Therefore instead of using join, in case the used case permits, just write a 
UDF, which then works as a look up. Using broadcast is the SPARK way, and 
someone mentioned here the use of Redis, which I remember used to be the way 
around in 2011 in the initial days of MR.
Regards,Gourav
On Thu, Aug 11, 2016 at 9:24 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.I’d try it out, and hope that if you have a few 
superlarge keys bigger than the RAM available of one node, they spill to disk. 
Maybe play with persist() and using a different Storage Level.
On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
Hi Ben,
and that will take care of skewed data?
Gourav 
On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 
So;a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()a.count()
b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()b.count()
And then join..

On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote:
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 




                                          

Reply via email to