Thanks for the reply, I'll try your suggestions. Apologies, in my previous post I was mistaken. rdd is actually an PairRDD of (Int, Int). I'm doing the self-join so I can count two things. First, I can count the number of times a value appears in the data set. Second I can count number of times values occur with the same key. For example, if I have the following:
(1,2) (1,3) (4,3) Then joining with itself I get: (1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1) (1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1) (1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1) (1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2) (4,(3,3)) - map - ((3,3),1) _| Note that I want to keep the duplicates (2,2) and reflections. Rgds On 18 February 2015 at 09:00, Akhil Das <ak...@sigmoidanalytics.com> wrote: > Why are you joining the rdd with itself? > > You can try these things: > > - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or > MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. > > - Set your default Serializer to Kryo (.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer")) > > - Enable rdd compression (.set("spark.rdd.compress","true")) > > > Thanks > Best Regards > > On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn <twal...@gmail.com> wrote: > >> Hi All, >> >> I'm a new Spark (and Hadoop) user and I want to find out if the cluster >> resources I am using are feasible for my use-case. The following is a >> snippet of code that is causing a OOM exception in the executor after about >> 125/1000 tasks during the map stage. >> >> > val rdd2 = rdd.join(rdd, numPartitions=1000) >> > .map(fp=>((fp._2._1, fp._2._2), 1)) >> > .reduceByKey((x,y)=>x+y) >> > rdd2.count() >> >> Which errors with a stack trace like: >> >> > 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in >> stage 2.0 (TID 498) >> > java.lang.OutOfMemoryError: GC overhead limit exceeded >> > at >> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) >> > at >> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) >> > at >> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) >> > at >> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) >> > at scala.collection.immutable.List.foreach(List.scala:318) >> >> rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of >> co-occuring values by key in the dataset, i.e. 'These two numbers occurred >> with the same key n times'. I intentionally don't want to filter out >> duplicates and reflections. rdd is about 3.6 million records, which has a >> size in memory of about 120MB, and results in a 'joined' RDD (before the >> reduceByKey stage) of around 460 million records, with a size in memory of >> about 35GB. >> >> My cluster setup is as follows. I have 3 nodes, where each node has 2 >> cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and >> executors are allowed 1280m each and the job has 5 executors and 1 driver. >> Additionally, I have set spark.storage.memoryFraction to 0.06, and >> spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate >> the issue. I've also tried increasing the number of partitions after the >> join dramatically (up to 15000). Nothing has been effective. Thus, I'm >> beginning to suspect I don't have enough resources for the job. >> >> Does anyone have a feeling about what the resource requirements would be >> for a use-case like this? I could scale the cluster up if necessary, but >> would like to avoid it. I'm willing to accept longer computation times if >> that is an option. >> >> Warm Regards, >> Thomas >> >> >