Hi Gourav,Thanks for your input. As mentioned previously, we've tried the broadcast. We've failed to broadcast 1.5GB...perhaps some tuning can help. We see CPU go up to 100%, and then workers die during the broadcast. I'm not sure if it's a good idea to broadcast that much, as spark's broadcast hint be default uses a threshold of just 10MB to decide whether to broadcast or not. As for redis, we're not needing a seperate redis cluster or anything. We're using embedded redis on the driver that lives for the duration of the job. It's essentially a way to have some memory on the driver that can accomodate 1.5GB and allows access over the network. https://github.com/kstyrc/embedded-redis makes this trivial to do. I don't know if this a 2011 way of solving this problem or not, but http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs seems to suggest that a good approach to joining a huge dataset with one that can't be made smaller is using a database. We've gone by that, and it seems to be working. We've tried all the other recommendations (broadcast the dataframe as part of the join, explicitly broadcast a hashmap from the driver, register temp tables, etc.) - and nothing else has worked. The parquet dataframe doesn't have a partitioner when loaded, and any sort of operation requiring a network shuffle causes temp disk fill up. Within these constraints, the database approach turned out to be the only thing we could get working (without paying double / treble for nodes that have more disk space to hold the temp files). Regards,Ashic.
From: gourav.sengu...@gmail.com Date: Thu, 11 Aug 2016 21:52:07 +0100 Subject: Re: Spark join and large temp files To: bteeu...@gmail.com CC: user@spark.apache.org The point is that if you have skewed data then one single reducer will finally take a very long time, and you do not need to try this even, just search in Google and skewed data is a known problem in joins even in SPARK. Therefore instead of using join, in case the used case permits, just write a UDF, which then works as a look up. Using broadcast is the SPARK way, and someone mentioned here the use of Redis, which I remember used to be the way around in 2011 in the initial days of MR. Regards,Gourav On Thu, Aug 11, 2016 at 9:24 PM, Ben Teeuwen <bteeu...@gmail.com> wrote: Hmm, hashing will probably send all of the records with the same key to the same partition / machine.I’d try it out, and hope that if you have a few superlarge keys bigger than the RAM available of one node, they spill to disk. Maybe play with persist() and using a different Storage Level. On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: Hi Ben, and that will take care of skewed data? Gourav On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote: When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? If you .cache() and .count() to force a shuffle, it'll push the records that will be joined to the same executors. So;a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()a.count() b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()b.count() And then join.. On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote: Hello,We have two parquet inputs of the following form: a: id:String, Name:String (1.5TB)b: id:String, Number:Int (1.3GB) We need to join these two to get (id, Number, Name). We've tried two approaches: a.join(b, Seq("id"), "right_outer") where a and b are dataframes. We also tried taking the rdds, mapping them to pair rdds with id as the key, and then joining. What we're seeing is that temp file usage is increasing on the join stage, and filling up our disks, causing the job to crash. Is there a way to join these two data sets without well...crashing? Note, the ids are unique, and there's a one to one mapping between the two datasets. Any help would be appreciated. -Ashic.