Hi Ashic,

That is a pretty 2011 way of solving the problem, what is more painful
about this way of working is that you need to load the data in to REDIS,
keep a REDIS cluster running and in case you are workign across several
clusters then may be install REDIS in all of them or hammer your driver.

Did you try using UDF's on broadcast data? The solution is pretty much the
same, except that instead of REDIS you use the broadcast variable and it
scales wonderfully across several cluster of machines.


Regards,
Gourav Sengupta

On Thu, Aug 11, 2016 at 11:23 PM, Ashic Mahtab <as...@live.com> wrote:

> Hi Ben,
> Already tried that. The thing is that any form of shuffle on the big
> dataset (which repartition will cause) puts a node's chunk into /tmp, and
> that fill up disk. I solved the problem by storing the 1.5GB dataset in an
> embedded redis instance on the driver, and doing a straight flatmap of the
> big dataset (doing lookups in redis). This avoids shuffling, and prevents
> the /tmp fill-up issue.
>
> -Ashic.
>
> ------------------------------
> Subject: Re: Spark join and large temp files
> From: bteeu...@gmail.com
> Date: Thu, 11 Aug 2016 22:24:42 +0200
> CC: user@spark.apache.org
> To: gourav.sengu...@gmail.com
>
>
> Hmm, hashing will probably send all of the records with the same key to
> the same partition / machine.
> I’d try it out, and hope that if you have a few superlarge keys bigger
> than the RAM available of one node, they spill to disk. Maybe play with
> persist() and using a different Storage Level.
>
> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
> Hi Ben,
>
> and that will take care of skewed data?
>
> Gourav
>
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>
> When you read both ‘a’ and ‘b', can you try repartitioning both by column
> ‘id’?
> If you .cache() and .count() to force a shuffle, it'll push the records
> that will be joined to the same executors.
>
> So;
> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
> a.count()
>
> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
> b.count()
>
> And then join..
>
>
> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote:
>
> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>

Reply via email to