RE: Spark join and large temp files

2016-08-12 Thread Ashic Mahtab
Hi Gourav,Thanks for your input. As mentioned previously, we've tried the 
broadcast. We've failed to broadcast 1.5GB...perhaps some tuning can help. We 
see CPU go up to 100%, and then workers die during the broadcast. I'm not sure 
if it's a good idea to broadcast that much, as spark's broadcast hint be 
default uses a threshold of just 10MB to decide whether to broadcast or not.
As for redis, we're not needing a seperate redis cluster or anything. We're 
using embedded redis on the driver that lives for the duration of the job. It's 
essentially a way to have some memory on the driver that can accomodate 1.5GB 
and allows access over the network. https://github.com/kstyrc/embedded-redis 
makes this trivial to do.
I don't know if this a 2011 way of solving this problem or not, but 
http://www.slideshare.net/databricks/strata-sj-everyday-im-shuffling-tips-for-writing-better-spark-programs
 seems to suggest that a good approach to joining a huge dataset with one that 
can't be made smaller is using a database. We've gone by that, and it seems to 
be working. We've tried all the other recommendations (broadcast the dataframe 
as part of the join, explicitly broadcast a hashmap from the driver, register 
temp tables, etc.) - and nothing else has worked. The parquet dataframe doesn't 
have a partitioner when loaded, and any sort of operation requiring a network 
shuffle causes temp disk fill up. Within these constraints, the database 
approach turned out to be the only thing we could get working (without paying 
double / treble for nodes that have more disk space to hold the temp files).
Regards,Ashic.

From: gourav.sengu...@gmail.com
Date: Thu, 11 Aug 2016 21:52:07 +0100
Subject: Re: Spark join and large temp files
To: bteeu...@gmail.com
CC: user@spark.apache.org

The point is that if you have skewed data then one single reducer will finally 
take a very long time, and you do not need to try this even, just search in 
Google and skewed data is a known problem in joins even in SPARK.
Therefore instead of using join, in case the used case permits, just write a 
UDF, which then works as a look up. Using broadcast is the SPARK way, and 
someone mentioned here the use of Redis, which I remember used to be the way 
around in 2011 in the initial days of MR.
Regards,Gourav
On Thu, Aug 11, 2016 at 9:24 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.I’d try it out, and hope that if you have a few 
superlarge keys bigger than the RAM available of one node, they spill to disk. 
Maybe play with persist() and using a different Storage Level.
On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote:
Hi Ben,
and that will take care of skewed data?
Gourav 
On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 
So;a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()a.count()
b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()b.count()
And then join..

On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote:
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 




  

Re: Spark join and large temp files

2016-08-12 Thread Gourav Sengupta
The point is that if you have skewed data then one single reducer will
finally take a very long time, and you do not need to try this even, just
search in Google and skewed data is a known problem in joins even in SPARK.

Therefore instead of using join, in case the used case permits, just write
a UDF, which then works as a look up. Using broadcast is the SPARK way, and
someone mentioned here the use of Redis, which I remember used to be the
way around in 2011 in the initial days of MR.

Regards,
Gourav

On Thu, Aug 11, 2016 at 9:24 PM, Ben Teeuwen  wrote:

> Hmm, hashing will probably send all of the records with the same key to
> the same partition / machine.
> I’d try it out, and hope that if you have a few superlarge keys bigger
> than the RAM available of one node, they spill to disk. Maybe play with
> persist() and using a different Storage Level.
>
> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta 
> wrote:
>
> Hi Ben,
>
> and that will take care of skewed data?
>
> Gourav
>
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen  wrote:
>
>> When you read both ‘a’ and ‘b', can you try repartitioning both by column
>> ‘id’?
>> If you .cache() and .count() to force a shuffle, it'll push the records
>> that will be joined to the same executors.
>>
>> So;
>> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
>> a.count()
>>
>> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
>> b.count()
>>
>> And then join..
>>
>>
>> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab  wrote:
>>
>> Hello,
>> We have two parquet inputs of the following form:
>>
>> a: id:String, Name:String  (1.5TB)
>> b: id:String, Number:Int  (1.3GB)
>>
>> We need to join these two to get (id, Number, Name). We've tried two
>> approaches:
>>
>> a.join(b, Seq("id"), "right_outer")
>>
>> where a and b are dataframes. We also tried taking the rdds, mapping them
>> to pair rdds with id as the key, and then joining. What we're seeing is
>> that temp file usage is increasing on the join stage, and filling up our
>> disks, causing the job to crash. Is there a way to join these two data sets
>> without well...crashing?
>>
>> Note, the ids are unique, and there's a one to one mapping between the
>> two datasets.
>>
>> Any help would be appreciated.
>>
>> -Ashic.
>>
>>
>>
>
>


Re: Spark join and large temp files

2016-08-12 Thread Gourav Sengupta
Hi Ashic,

That is a pretty 2011 way of solving the problem, what is more painful
about this way of working is that you need to load the data in to REDIS,
keep a REDIS cluster running and in case you are workign across several
clusters then may be install REDIS in all of them or hammer your driver.

Did you try using UDF's on broadcast data? The solution is pretty much the
same, except that instead of REDIS you use the broadcast variable and it
scales wonderfully across several cluster of machines.


Regards,
Gourav Sengupta

On Thu, Aug 11, 2016 at 11:23 PM, Ashic Mahtab <as...@live.com> wrote:

> Hi Ben,
> Already tried that. The thing is that any form of shuffle on the big
> dataset (which repartition will cause) puts a node's chunk into /tmp, and
> that fill up disk. I solved the problem by storing the 1.5GB dataset in an
> embedded redis instance on the driver, and doing a straight flatmap of the
> big dataset (doing lookups in redis). This avoids shuffling, and prevents
> the /tmp fill-up issue.
>
> -Ashic.
>
> ----------
> Subject: Re: Spark join and large temp files
> From: bteeu...@gmail.com
> Date: Thu, 11 Aug 2016 22:24:42 +0200
> CC: user@spark.apache.org
> To: gourav.sengu...@gmail.com
>
>
> Hmm, hashing will probably send all of the records with the same key to
> the same partition / machine.
> I’d try it out, and hope that if you have a few superlarge keys bigger
> than the RAM available of one node, they spill to disk. Maybe play with
> persist() and using a different Storage Level.
>
> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
> wrote:
>
> Hi Ben,
>
> and that will take care of skewed data?
>
> Gourav
>
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
>
> When you read both ‘a’ and ‘b', can you try repartitioning both by column
> ‘id’?
> If you .cache() and .count() to force a shuffle, it'll push the records
> that will be joined to the same executors.
>
> So;
> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
> a.count()
>
> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
> b.count()
>
> And then join..
>
>
> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote:
>
> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>


RE: Spark join and large temp files

2016-08-11 Thread Ashic Mahtab
Hi Ben,Already tried that. The thing is that any form of shuffle on the big 
dataset (which repartition will cause) puts a node's chunk into /tmp, and that 
fill up disk. I solved the problem by storing the 1.5GB dataset in an embedded 
redis instance on the driver, and doing a straight flatmap of the big dataset 
(doing lookups in redis). This avoids shuffling, and prevents the /tmp fill-up 
issue.
-Ashic. 

Subject: Re: Spark join and large temp files
From: bteeu...@gmail.com
Date: Thu, 11 Aug 2016 22:24:42 +0200
CC: user@spark.apache.org
To: gourav.sengu...@gmail.com

Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.I’d try it out, and hope that if you have a few 
superlarge keys bigger than the RAM available of one node, they spill to disk. 
Maybe play with persist() and using a different Storage Level.
On Aug 11, 2016, at 9:48 PM, Gourav Sengupta <gourav.sengu...@gmail.com> 
wrote:Hi Ben,
and that will take care of skewed data?
Gourav 
On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen <bteeu...@gmail.com> wrote:
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 
So;a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()a.count()
b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()b.count()
And then join..

On Aug 8, 2016, at 8:17 PM, Ashic Mahtab <as...@live.com> wrote:
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  

Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
Hmm, hashing will probably send all of the records with the same key to the 
same partition / machine.
I’d try it out, and hope that if you have a few superlarge keys bigger than the 
RAM available of one node, they spill to disk. Maybe play with persist() and 
using a different Storage Level.

> On Aug 11, 2016, at 9:48 PM, Gourav Sengupta  
> wrote:
> 
> Hi Ben,
> 
> and that will take care of skewed data?
> 
> Gourav 
> 
> On Thu, Aug 11, 2016 at 8:41 PM, Ben Teeuwen  > wrote:
> When you read both ‘a’ and ‘b', can you try repartitioning both by column 
> ‘id’? 
> If you .cache() and .count() to force a shuffle, it'll push the records that 
> will be joined to the same executors. 
> 
> So;
> a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
> a.count()
> 
> b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
> b.count()
> 
> And then join..
> 
> 
>> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab > > wrote:
>> 
>> Hello,
>> We have two parquet inputs of the following form:
>> 
>> a: id:String, Name:String  (1.5TB)
>> b: id:String, Number:Int  (1.3GB)
>> 
>> We need to join these two to get (id, Number, Name). We've tried two 
>> approaches:
>> 
>> a.join(b, Seq("id"), "right_outer")
>> 
>> where a and b are dataframes. We also tried taking the rdds, mapping them to 
>> pair rdds with id as the key, and then joining. What we're seeing is that 
>> temp file usage is increasing on the join stage, and filling up our disks, 
>> causing the job to crash. Is there a way to join these two data sets without 
>> well...crashing?
>> 
>> Note, the ids are unique, and there's a one to one mapping between the two 
>> datasets. 
>> 
>> Any help would be appreciated.
>> 
>> -Ashic. 
> 
> 



Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? 
If you .cache() and .count() to force a shuffle, it'll push the records that 
will be joined to the same executors. 

So;
a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache()
a.count()

b = spark.read.parquet(‘path_to_table_b').repartition(‘id’).cache()
b.count()

And then join..


> On Aug 8, 2016, at 8:17 PM, Ashic Mahtab  wrote:
> 
> Hello,
> We have two parquet inputs of the following form:
> 
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
> 
> We need to join these two to get (id, Number, Name). We've tried two 
> approaches:
> 
> a.join(b, Seq("id"), "right_outer")
> 
> where a and b are dataframes. We also tried taking the rdds, mapping them to 
> pair rdds with id as the key, and then joining. What we're seeing is that 
> temp file usage is increasing on the join stage, and filling up our disks, 
> causing the job to crash. Is there a way to join these two data sets without 
> well...crashing?
> 
> Note, the ids are unique, and there's a one to one mapping between the two 
> datasets. 
> 
> Any help would be appreciated.
> 
> -Ashic. 



RE: Spark join and large temp files

2016-08-10 Thread Ashic Mahtab
Already tried that. The CPU hits 100% on the collectAsMap (even tried 
foreaching to a java ConcurrentHashmap), and eventually finishes, but while 
broadcasting, it takes a while, and at some point there's some timeout, and the 
worker is killed. The driver (and workers) have more than enough RAM (1.5GB of 
parquet expands to about 4.5GB, and the nodes have 64GB RAM). Filtering is also 
not an option, as every entry of the "smaller" dataset exists in the large one.
As mentioned in another reply, I managed to get it working by using embedded 
Redis on the driver, loading the smaller dataset into it, and then doing a 
straight map on the larger dataset via a foreachPartition, and doing lookups to 
the dirver's Redis. Since there's no network shuffle, the temp folder is barely 
touched, and it seems to work quite well.
-Ashic.

From: zouz...@gmail.com
Date: Wed, 10 Aug 2016 08:22:24 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi Ashic,
I think this approach should solve your problem, i.e., by broadcasting the 
small RDD. However you should do it propertly.
IMO, you should try
val smallRDDBroadcasted: Map[Int, YouTypeValue] = 
sc.broadcast(smallRDD.collectAsMap())
bigRDD.mapPartitoin{ case elems =>   // Here manually join using the map
elems.flatMap{ case (key, value) =>  
smallRDDBroadcasted.value.get(key).map(x => (key, (value,x))}}
Ensure that your driver has enough memory to store the above Map. If you get 
out of memory on the driver, increase your memory.
Speaking of which, a filtering step might also help on the above, i.e., filter 
the bigRDD with the keys of the Map before joining.
Hope this helps,Anastasios 

On Tue, Aug 9, 2016 at 4:46 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Can you give some outline as to what you mean? Should I broadcast a dataframe, 
and register the broadcasted df as a temp table? And then use a lookup UDF in a 
SELECT query?  
I've managed to get it working by loading the 1.5GB dataset into an embedded 
redis instance on the driver, and used a mapPartitions on the big dataframe to 
map it to the required triples by doing the lookup from redis. It took around 
13 minutes to load the data into redis using 4 cores, and the subsequent map on 
the main dataset was quite fast. 

From: gourav.sengu...@gmail.com
Date: Tue, 9 Aug 2016 21:13:51 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: mich.talebza...@gmail.com; samkiller@gmail.com; deepakmc...@gmail.com; 
user@spark.apache.org

In case of skewed data the joins will mess things up. Try to write a UDF with 
the lookup on broadcast variable and then let me know the results. It should 
not take more than 40 mins in a 32 GB RAM system with 6 core processors.

Gourav
On Tue, Aug 9, 2016 at 6:02 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Mich,Hardware: AWS EMR cluster with 15 nodes with Rx3.2xlarge (CPU, RAM 
fine, disk a couple of hundred gig).
When we do:
onPointFiveTB.join(onePointFiveGig.cache(), "id")
we're seing that the temp directory is filling up fast, until a node gets 
killed. And then everything dies. 
-Ashic. 

From: mich.talebza...@gmail.com
Date: Tue, 9 Aug 2016 17:25:23 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: samkiller@gmail.com; deepakmc...@gmail.com; user@spark.apache.org

Hi Sam,
What is your spark Hardware spec, No of nodes, RAM per node and disks please?
I don't understand this should not really be an issue. Underneath the bonnet it 
is a hash join. The small table I gather can be cached and the big table will 
do multiple passes using the temp space.
HTH

Dr Mich Talebzadeh


 


LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw


 


http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction
of data or any other property which may arise from relying on this email's 
technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  



On 9 August 2016 at 15:46, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:Strin

Re: Spark join and large temp files

2016-08-09 Thread Gourav Sengupta
In case of skewed data the joins will mess things up. Try to write a UDF
with the lookup on broadcast variable and then let me know the results. It
should not take more than 40 mins in a 32 GB RAM system with 6 core
processors.


Gourav

On Tue, Aug 9, 2016 at 6:02 PM, Ashic Mahtab <as...@live.com> wrote:

> Hi Mich,
> Hardware: AWS EMR cluster with 15 nodes with Rx3.2xlarge (CPU, RAM fine,
> disk a couple of hundred gig).
>
> When we do:
>
> onPointFiveTB.join(onePointFiveGig.cache(), "id")
>
> we're seing that the temp directory is filling up fast, until a node gets
> killed. And then everything dies.
>
> -Ashic.
>
> --
> From: mich.talebza...@gmail.com
> Date: Tue, 9 Aug 2016 17:25:23 +0100
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: samkiller@gmail.com; deepakmc...@gmail.com; user@spark.apache.org
>
>
> Hi Sam,
>
> What is your spark Hardware spec, No of nodes, RAM per node and disks
> please?
>
> I don't understand this should not really be an issue. Underneath the
> bonnet it is a hash join. The small table I gather can be cached and the
> big table will do multiple passes using the temp space.
>
> HTH
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
> On 9 August 2016 at 15:46, Ashic Mahtab <as...@live.com> wrote:
>
> Hi Sam,
> Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's
> no progress. The spark UI doesn't even show up.
>
> -Ashic.
>
> --
> From: samkiller@gmail.com
> Date: Tue, 9 Aug 2016 16:21:27 +0200
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: deepakmc...@gmail.com; user@spark.apache.org
>
>
> Have you tried to broadcast your small table table in order to perform
> your join ?
>
> joined = bigDF.join(broadcast(smallDF, )
>
>
> On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:
>
> Hi Deepak,
> No...not really. Upping the disk size is a solution, but more expensive as
> you can't attach EBS volumes to EMR clusters configured with data pipelines
> easily (which is what we're doing). I've tried collecting the 1.5G dataset
> in a hashmap, and broadcasting. Timeouts seems to prevent that (even after
> upping the max driver result size). Increasing partition counts didn't help
> (the shuffle used up the temp space). I'm now looking at some form of
> clever broadcasting, or maybe falling back to chunking up the input,
> producing interim output, and unioning them for the final output. Might
> even try using Spark Streaming pointing to the parquet and seeing if that
> helps.
>
> -Ashic.
>
> --
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 17:31:19 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
>
> Hi Ashic
> Did you find the resolution to this issue?
> Just curious to know like what helped in this scenario.
>
> Thanks
> Deepak
>
>
> On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:
>
> Hi Deepak,
> Thanks for the response.
>
> Registering the temp tables didn't help. Here's what I have:
>
> val a = sqlContext..read.parquet(...).select("eid.id",
> "name").withColumnRenamed("eid.id", "id")
> val b = sqlContext.read.parquet(...).select("id", "number")
>
> a.registerTempTable("a")
> b.registerTempTable("b")
>
> val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join
> b y on x.id=y.id)
>
> results.write.parquet(...)
>
> Is there something I'm missing?
>
> Cheers,
> Ashic.
>
> --
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 00:01:32 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: user@spark.apache.org
>
>
> Register you dataframes as temp tables and then try the join on the temp
> table.
> This should resolve your issue.
>
> Thanks
> Deepak
>
> On Mon, Aug 8, 2016 at 11:

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Hi Mich,Hardware: AWS EMR cluster with 15 nodes with Rx3.2xlarge (CPU, RAM 
fine, disk a couple of hundred gig).
When we do:
onPointFiveTB.join(onePointFiveGig.cache(), "id")
we're seing that the temp directory is filling up fast, until a node gets 
killed. And then everything dies. 
-Ashic. 

From: mich.talebza...@gmail.com
Date: Tue, 9 Aug 2016 17:25:23 +0100
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: samkiller@gmail.com; deepakmc...@gmail.com; user@spark.apache.org

Hi Sam,
What is your spark Hardware spec, No of nodes, RAM per node and disks please?
I don't understand this should not really be an issue. Underneath the bonnet it 
is a hash join. The small table I gather can be cached and the big table will 
do multiple passes using the temp space.
HTH

Dr Mich Talebzadeh

 

LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction
of data or any other property which may arise from relying on this email's 
technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.  



On 9 August 2016 at 15:46, Ashic Mahtab <as...@live.com> wrote:



Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

  

  

Re: Spark join and large temp files

2016-08-09 Thread Mich Talebzadeh
Hi Sam,

What is your spark Hardware spec, No of nodes, RAM per node and disks
please?

I don't understand this should not really be an issue. Underneath the
bonnet it is a hash join. The small table I gather can be cached and the
big table will do multiple passes using the temp space.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 9 August 2016 at 15:46, Ashic Mahtab <as...@live.com> wrote:

> Hi Sam,
> Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's
> no progress. The spark UI doesn't even show up.
>
> -Ashic.
>
> --
> From: samkiller@gmail.com
> Date: Tue, 9 Aug 2016 16:21:27 +0200
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: deepakmc...@gmail.com; user@spark.apache.org
>
>
> Have you tried to broadcast your small table table in order to perform
> your join ?
>
> joined = bigDF.join(broadcast(smallDF, )
>
>
> On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:
>
> Hi Deepak,
> No...not really. Upping the disk size is a solution, but more expensive as
> you can't attach EBS volumes to EMR clusters configured with data pipelines
> easily (which is what we're doing). I've tried collecting the 1.5G dataset
> in a hashmap, and broadcasting. Timeouts seems to prevent that (even after
> upping the max driver result size). Increasing partition counts didn't help
> (the shuffle used up the temp space). I'm now looking at some form of
> clever broadcasting, or maybe falling back to chunking up the input,
> producing interim output, and unioning them for the final output. Might
> even try using Spark Streaming pointing to the parquet and seeing if that
> helps.
>
> -Ashic.
>
> ------
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 17:31:19 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
>
> Hi Ashic
> Did you find the resolution to this issue?
> Just curious to know like what helped in this scenario.
>
> Thanks
> Deepak
>
>
> On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:
>
> Hi Deepak,
> Thanks for the response.
>
> Registering the temp tables didn't help. Here's what I have:
>
> val a = sqlContext..read.parquet(...).select("eid.id",
> "name").withColumnRenamed("eid.id", "id")
> val b = sqlContext.read.parquet(...).select("id", "number")
>
> a.registerTempTable("a")
> b.registerTempTable("b")
>
> val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join
> b y on x.id=y.id)
>
> results.write.parquet(...)
>
> Is there something I'm missing?
>
> Cheers,
> Ashic.
>
> --
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 00:01:32 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: user@spark.apache.org
>
>
> Register you dataframes as temp tables and then try the join on the temp
> table.
> This should resolve your issue.
>
> Thanks
> Deepak
>
> On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:
>
> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>


RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's 
no progress. The spark UI doesn't even show up.
-Ashic. 

From: samkiller@gmail.com
Date: Tue, 9 Aug 2016 16:21:27 +0200
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: deepakmc...@gmail.com; user@spark.apache.org

Have you tried to broadcast your small table table in order to perform your 
join ?
joined = bigDF.join(broadcast(smallDF, )

On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

  

Re: Spark join and large temp files

2016-08-09 Thread Sam Bessalah
Have you tried to broadcast your small table table in order to perform your
join ?

joined = bigDF.join(broadcast(smallDF, )


On Tue, Aug 9, 2016 at 3:29 PM, Ashic Mahtab <as...@live.com> wrote:

> Hi Deepak,
> No...not really. Upping the disk size is a solution, but more expensive as
> you can't attach EBS volumes to EMR clusters configured with data pipelines
> easily (which is what we're doing). I've tried collecting the 1.5G dataset
> in a hashmap, and broadcasting. Timeouts seems to prevent that (even after
> upping the max driver result size). Increasing partition counts didn't help
> (the shuffle used up the temp space). I'm now looking at some form of
> clever broadcasting, or maybe falling back to chunking up the input,
> producing interim output, and unioning them for the final output. Might
> even try using Spark Streaming pointing to the parquet and seeing if that
> helps.
>
> -Ashic.
>
> --
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 17:31:19 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
>
> Hi Ashic
> Did you find the resolution to this issue?
> Just curious to know like what helped in this scenario.
>
> Thanks
> Deepak
>
>
> On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:
>
> Hi Deepak,
> Thanks for the response.
>
> Registering the temp tables didn't help. Here's what I have:
>
> val a = sqlContext..read.parquet(...).select("eid.id",
> "name").withColumnRenamed("eid.id", "id")
> val b = sqlContext.read.parquet(...).select("id", "number")
>
> a.registerTempTable("a")
> b.registerTempTable("b")
>
> val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join
> b y on x.id=y.id)
>
> results.write.parquet(...)
>
> Is there something I'm missing?
>
> Cheers,
> Ashic.
>
> --
> From: deepakmc...@gmail.com
> Date: Tue, 9 Aug 2016 00:01:32 +0530
> Subject: Re: Spark join and large temp files
> To: as...@live.com
> CC: user@spark.apache.org
>
>
> Register you dataframes as temp tables and then try the join on the temp
> table.
> This should resolve your issue.
>
> Thanks
> Deepak
>
> On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:
>
> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Hi Deepak,No...not really. Upping the disk size is a solution, but more 
expensive as you can't attach EBS volumes to EMR clusters configured with data 
pipelines easily (which is what we're doing). I've tried collecting the 1.5G 
dataset in a hashmap, and broadcasting. Timeouts seems to prevent that (even 
after upping the max driver result size). Increasing partition counts didn't 
help (the shuffle used up the temp space). I'm now looking at some form of 
clever broadcasting, or maybe falling back to chunking up the input, producing 
interim output, and unioning them for the final output. Might even try using 
Spark Streaming pointing to the parquet and seeing if that helps. 
-Ashic. 

From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 17:31:19 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com

Hi AshicDid you find the resolution to this issue?Just curious to know like 
what helped in this scenario.
ThanksDeepak


On Tue, Aug 9, 2016 at 12:23 AM, Ashic Mahtab <as...@live.com> wrote:



Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

Re: Spark join and large temp files

2016-08-08 Thread Yong Zhang
Join requires shuffling. The problem is that you have to shuffle 1.5T data, 
which caused problem on your disk usage. Another way is to broadcast the 1.5G 
small dataset, so there is no shuffle requirement for 1.5T dataset. But you 
need to make sure you have enough memory.


Can you try to increase your partition count, which will make each partition 
contains less data for your 1.5T, so the whole disk usage of split data maybe 
less.


But keep in mind you should always have enough space of your disk to handle the 
job you plan to run.


Yong



From: Ashic Mahtab <as...@live.com>
Sent: Monday, August 8, 2016 2:53 PM
To: Deepak Sharma
Cc: Apache Spark
Subject: RE: Spark join and large temp files

Hi Deepak,
Thanks for the response.

Registering the temp tables didn't help. Here's what I have:

val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")
val b = sqlContext.read.parquet(...).select("id", "number")

a.registerTempTable("a")
b.registerTempTable("b")

val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)

results.write.parquet(...)

Is there something I'm missing?

Cheers,
Ashic.

____
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp table.
This should resolve your issue.

Thanks
Deepak

On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab 
<as...@live.com<mailto:as...@live.com>> wrote:
Hello,
We have two parquet inputs of the following form:

a: id:String, Name:String  (1.5TB)
b: id:String, Number:Int  (1.3GB)

We need to join these two to get (id, Number, Name). We've tried two approaches:

a.join(b, Seq("id"), "right_outer")

where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?

Note, the ids are unique, and there's a one to one mapping between the two 
datasets.

Any help would be appreciated.

-Ashic.







--
Thanks
Deepak
www.bigdatabig.com<http://www.bigdatabig.com>
www.keosha.net<http://www.keosha.net>


RE: Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
Hi Deepak,Thanks for the response. 
Registering the temp tables didn't help. Here's what I have:
val a = sqlContext..read.parquet(...).select("eid.id", 
"name").withColumnRenamed("eid.id", "id")val b = 
sqlContext.read.parquet(...).select("id", "number")
a.registerTempTable("a")b.registerTempTable("b")
val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y 
on x.id=y.id)
results.write.parquet(...)
Is there something I'm missing?
Cheers,Ashic.
From: deepakmc...@gmail.com
Date: Tue, 9 Aug 2016 00:01:32 +0530
Subject: Re: Spark join and large temp files
To: as...@live.com
CC: user@spark.apache.org

Register you dataframes as temp tables and then try the join on the temp 
table.This should resolve your issue.
ThanksDeepak
On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab <as...@live.com> wrote:



Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic. 



  


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net
  

Re: Spark join and large temp files

2016-08-08 Thread Deepak Sharma
Register you dataframes as temp tables and then try the join on the temp
table.
This should resolve your issue.

Thanks
Deepak

On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab  wrote:

> Hello,
> We have two parquet inputs of the following form:
>
> a: id:String, Name:String  (1.5TB)
> b: id:String, Number:Int  (1.3GB)
>
> We need to join these two to get (id, Number, Name). We've tried two
> approaches:
>
> a.join(b, Seq("id"), "right_outer")
>
> where a and b are dataframes. We also tried taking the rdds, mapping them
> to pair rdds with id as the key, and then joining. What we're seeing is
> that temp file usage is increasing on the join stage, and filling up our
> disks, causing the job to crash. Is there a way to join these two data sets
> without well...crashing?
>
> Note, the ids are unique, and there's a one to one mapping between the two
> datasets.
>
> Any help would be appreciated.
>
> -Ashic.
>
>
>
>
>


-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
Hello,We have two parquet inputs of the following form:
a: id:String, Name:String  (1.5TB)b: id:String, Number:Int  (1.3GB)
We need to join these two to get (id, Number, Name). We've tried two approaches:
a.join(b, Seq("id"), "right_outer")
where a and b are dataframes. We also tried taking the rdds, mapping them to 
pair rdds with id as the key, and then joining. What we're seeing is that temp 
file usage is increasing on the join stage, and filling up our disks, causing 
the job to crash. Is there a way to join these two data sets without 
well...crashing?
Note, the ids are unique, and there's a one to one mapping between the two 
datasets. 
Any help would be appreciated.
-Ashic.