Have you tried to broadcast your small table table in order to perform your
join ?

joined = bigDF.join(broadcast(smallDF, ....)


On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:

> Hi Deepak,
> No...not really. Upping the disk size is a solution, but more expensive as
> you can't attach EBS volumes to EMR clusters configured with data pipelines
> easily (which is what we're doing). I've tried collecting the 1.5G dataset
> in a hashmap, and broadcasting. Timeouts seems to prevent that (even after
> upping the max driver result size). Increasing partition counts didn't help
> (the shuffle used up the temp space). I'm now looking at some form of
> clever broadcasting, or maybe falling back to chunking up the input,
> producing interim output, and unioning them for the final output. Might
> even try using Spark Streaming pointing to the parquet and seeing if that
> helps.
>
> -Ashic.
>
> ------------------------------
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 17:31:19 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
>
> Hi Ashic
> Did you find the resolution to this issue?
> Just curious to know like what helped in this scenario.
>
> Thanks
> Deepak
>
>
> On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:
>
> Hi Deepak,
> Thanks for the response.
>
> Registering the temp tables didn't help. Here's what I have:
>
> val a = sqlContext..read.parquet(...).select("eid.id",
> "name").withColumnRenamed("eid.id", "id")
> val b = sqlContext.read.parquet(...).select("id", "number")
>
> a.registerTempTable("a")
> b.registerTempTable("b")
>
> val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join
> b y on x.id=y.id)
>
> results.write.parquet(...)
>
> Is there something I'm missing?
>
> Cheers,
> Ashic.
>
> ------------------------------
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 00:01:32 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: user@spark.apache.org
>
>
> Register you dataframes as temp tables and then try the join on the temp
> table.
> This should resolve your issue.
>
> Thanks
> Deepak
>
> On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:
>
> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>

Reply via email to