Why are you joining the rdd with itself? You can try these things:
- Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory. - Set your default Serializer to Kryo (.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")) - Enable rdd compression (.set("spark.rdd.compress","true")) Thanks Best Regards On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn <twal...@gmail.com> wrote: > Hi All, > > I'm a new Spark (and Hadoop) user and I want to find out if the cluster > resources I am using are feasible for my use-case. The following is a > snippet of code that is causing a OOM exception in the executor after about > 125/1000 tasks during the map stage. > > > val rdd2 = rdd.join(rdd, numPartitions=1000) > > .map(fp=>((fp._2._1, fp._2._2), 1)) > > .reduceByKey((x,y)=>x+y) > > rdd2.count() > > Which errors with a stack trace like: > > > 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in > stage 2.0 (TID 498) > > java.lang.OutOfMemoryError: GC overhead limit exceeded > > at > scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168) > > at > scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45) > > at > scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) > > at > scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48) > > at scala.collection.immutable.List.foreach(List.scala:318) > > rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of > co-occuring values by key in the dataset, i.e. 'These two numbers occurred > with the same key n times'. I intentionally don't want to filter out > duplicates and reflections. rdd is about 3.6 million records, which has a > size in memory of about 120MB, and results in a 'joined' RDD (before the > reduceByKey stage) of around 460 million records, with a size in memory of > about 35GB. > > My cluster setup is as follows. I have 3 nodes, where each node has 2 > cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and > executors are allowed 1280m each and the job has 5 executors and 1 driver. > Additionally, I have set spark.storage.memoryFraction to 0.06, and > spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate > the issue. I've also tried increasing the number of partitions after the > join dramatically (up to 15000). Nothing has been effective. Thus, I'm > beginning to suspect I don't have enough resources for the job. > > Does anyone have a feeling about what the resource requirements would be > for a use-case like this? I could scale the cluster up if necessary, but > would like to avoid it. I'm willing to accept longer computation times if > that is an option. > > Warm Regards, > Thomas > >