RE: Spark join and large temp files

2016-08-12 Thread Ashic Mahtab
we could get working (without paying double / treble for nodes that have more disk space to hold the temp files). Regards,Ashic. From: gourav.sengu...@gmail.com Date: Thu, 11 Aug 2016 21:52:07 +0100 Subject: Re: Spark join and large temp files To: bteeu...@gmail.com CC: user@spark.apache.org

Re: Spark join and large temp files

2016-08-12 Thread Gourav Sengupta
The point is that if you have skewed data then one single reducer will finally take a very long time, and you do not need to try this even, just search in Google and skewed data is a known problem in joins even in SPARK. Therefore instead of using join, in case the used case permits, just write a

Re: Spark join and large temp files

2016-08-12 Thread Gourav Sengupta
the driver, and doing a straight flatmap of the > big dataset (doing lookups in redis). This avoids shuffling, and prevents > the /tmp fill-up issue. > > -Ashic. > > ---------- > Subject: Re: Spark join and large temp files > From: bteeu...@gmail.com >

RE: Spark join and large temp files

2016-08-11 Thread Ashic Mahtab
of the big dataset (doing lookups in redis). This avoids shuffling, and prevents the /tmp fill-up issue. -Ashic. Subject: Re: Spark join and large temp files From: bteeu...@gmail.com Date: Thu, 11 Aug 2016 22:24:42 +0200 CC: user@spark.apache.org To: gourav.sengu...@gmail.com Hmm, hashing

Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
Hmm, hashing will probably send all of the records with the same key to the same partition / machine. I’d try it out, and hope that if you have a few superlarge keys bigger than the RAM available of one node, they spill to disk. Maybe play with persist() and using a different Storage Level. >

Re: Spark join and large temp files

2016-08-11 Thread Ben Teeuwen
When you read both ‘a’ and ‘b', can you try repartitioning both by column ‘id’? If you .cache() and .count() to force a shuffle, it'll push the records that will be joined to the same executors. So; a = spark.read.parquet(‘path_to_table_a’).repartition(‘id’).cache() a.count() b =

RE: Spark join and large temp files

2016-08-10 Thread Ashic Mahtab
2:24 +0200 Subject: Re: Spark join and large temp files To: as...@live.com Hi Ashic, I think this approach should solve your problem, i.e., by broadcasting the small RDD. However you should do it propertly. IMO, you should try val smallRDDBroadcasted: Map[Int, YouTypeValue] = sc

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Subject: Re: Spark join and large temp files To: as...@live.com CC: mich.talebza...@gmail.com; samkiller@gmail.com; deepakmc...@gmail.com; user@spark.apache.org In case of skewed data the joins will mess things up. Try to write a UDF with the lookup on broadcast variable and then let me know

Re: Spark join and large temp files

2016-08-09 Thread Gourav Sengupta
filling up fast, until a node gets > killed. And then everything dies. > > -Ashic. > > -- > From: mich.talebza...@gmail.com > Date: Tue, 9 Aug 2016 17:25:23 +0100 > Subject: Re: Spark join and large temp files > To: as...@live.com > CC: samkiller@gmail

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
s. -Ashic. From: mich.talebza...@gmail.com Date: Tue, 9 Aug 2016 17:25:23 +0100 Subject: Re: Spark join and large temp files To: as...@live.com CC: samkiller@gmail.com; deepakmc...@gmail.com; user@spark.apache.org Hi Sam, What is your spark Hardware spec, No of nodes, RAM per node and disks pleas

Re: Spark join and large temp files

2016-08-09 Thread Mich Talebzadeh
9 Aug 2016 16:21:27 +0200 > Subject: Re: Spark join and large temp files > To: as...@live.com > CC: deepakmc...@gmail.com; user@spark.apache.org > > > Have you tried to broadcast your small table table in order to perform > your join ? > > joined = bigDF.join(broadcast

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
Hi Sam,Yup. It seems it stalls when broadcasting. CPU goes to 100%, but there's no progress. The spark UI doesn't even show up. -Ashic. From: samkiller@gmail.com Date: Tue, 9 Aug 2016 16:21:27 +0200 Subject: Re: Spark join and large temp files To: as...@live.com CC: deepakmc...@gmail.com

Re: Spark join and large temp files

2016-08-09 Thread Sam Bessalah
g pointing to the parquet and seeing if that > helps. > > -Ashic. > > -- > From: deepakmc...@gmail.com > Date: Tue, 9 Aug 2016 17:31:19 +0530 > Subject: Re: Spark join and large temp files > To: as...@live.com > > Hi Ashic > Did you fi

RE: Spark join and large temp files

2016-08-09 Thread Ashic Mahtab
. Might even try using Spark Streaming pointing to the parquet and seeing if that helps. -Ashic. From: deepakmc...@gmail.com Date: Tue, 9 Aug 2016 17:31:19 +0530 Subject: Re: Spark join and large temp files To: as...@live.com Hi AshicDid you find the resolution to this issue?Just curious to know

Re: Spark join and large temp files

2016-08-08 Thread Yong Zhang
Mahtab <as...@live.com> Sent: Monday, August 8, 2016 2:53 PM To: Deepak Sharma Cc: Apache Spark Subject: RE: Spark join and large temp files Hi Deepak, Thanks for the response. Registering the temp tables didn't help. Here's what I have: val a = sqlContext..read.parquet(...).select(&

RE: Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
ot;number") a.registerTempTable("a")b.registerTempTable("b") val results = sqlContext.sql("SELECT x.id, x.name, y.number FROM a x join b y on x.id=y.id) results.write.parquet(...) Is there something I'm missing? Cheers,Ashic. From: deepakmc...@gmail.com Date: Tue, 9 Au

Re: Spark join and large temp files

2016-08-08 Thread Deepak Sharma
Register you dataframes as temp tables and then try the join on the temp table. This should resolve your issue. Thanks Deepak On Mon, Aug 8, 2016 at 11:47 PM, Ashic Mahtab wrote: > Hello, > We have two parquet inputs of the following form: > > a: id:String, Name:String (1.5TB)

Spark join and large temp files

2016-08-08 Thread Ashic Mahtab
Hello,We have two parquet inputs of the following form: a: id:String, Name:String (1.5TB)b: id:String, Number:Int (1.3GB) We need to join these two to get (id, Number, Name). We've tried two approaches: a.join(b, Seq("id"), "right_outer") where a and b are dataframes. We also tried taking the