Have you tried to broadcast your small table table in order to perform your join ?
joined = bigDF.join(broadcast(smallDF, ....) On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote: > Hi Deepak, > No...not really. Upping the disk size is a solution, but more expensive as > you can't attach EBS volumes to EMR clusters configured with data pipelines > easily (which is what we're doing). I've tried collecting the 1.5G dataset > in a hashmap, and broadcasting. Timeouts seems to prevent that (even after > upping the max driver result size). Increasing partition counts didn't help > (the shuffle used up the temp space). I'm now looking at some form of > clever broadcasting, or maybe falling back to chunking up the input, > producing interim output, and unioning them for the final output. Might > even try using Spark Streaming pointing to the parquet and seeing if that > helps. > > -Ashic. > > ------------------------------ > From: deepakmc...@gmail.com > Date: Tue, 9 Aug 2016 17:31:19 +0530 > Subject: Re: Spark join and large temp files > To: as...@live.com > > Hi Ashic > Did you find the resolution to this issue? > Just curious to know like what helped in this scenario. > > Thanks > Deepak > > > On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote: > > Hi Deepak, > Thanks for the response. > > Registering the temp tables didn't help. Here's what I have: > > val a = sqlContext..read.parquet(...).select("eid.id", > "name").withColumnRenamed("eid.id", "id") > val b = sqlContext.read.parquet(...).select("id", "number") > > a.registerTempTable("a") > b.registerTempTable("b") > > val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join > b y on x.id=y.id) > > results.write.parquet(...) > > Is there something I'm missing? > > Cheers, > Ashic. > > ------------------------------ > From: deepakmc...@gmail.com > Date: Tue, 9 Aug 2016 00:01:32 +0530 > Subject: Re: Spark join and large temp files > To: as...@live.com > CC: user@spark.apache.org > > > Register you dataframes as temp tables and then try the join on the temp > table. > This should resolve your issue. > > Thanks > Deepak > > On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote: > > Hello, > We have two parquet inputs of the following form: > > a: id:String, Name:String (1.5TB) > b: id:String, Number:Int (1.3GB) > > We need to join these two to get (id, Number, Name). We've tried two > approaches: > > a.join(b, Seq("id"), "right_outer") > > where a and b are dataframes. We also tried taking the rdds, mapping them > to pair rdds with id as the key, and then joining. What we're seeing is > that temp file usage is increasing on the join stage, and filling up our > disks, causing the job to crash. Is there a way to join these two data sets > without well...crashing? > > Note, the ids are unique, and there's a one to one mapping between the two > datasets. > > Any help would be appreciated. > > -Ashic. > > > > > > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net > > > > > -- > Thanks > Deepak > www.bigdatabig.com > www.keosha.net >