Why are you joining the rdd with itself?

You can try these things:

- Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.

- Set your default Serializer to Kryo (.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer"))

- Enable rdd compression (.set("spark.rdd.compress","true"))


Thanks
Best Regards

On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn <twal...@gmail.com> wrote:

> Hi All,
>
> I'm a new Spark (and Hadoop) user and I want to find out if the cluster
> resources I am using are feasible for my use-case. The following is a
> snippet of code that is causing a OOM exception in the executor after about
> 125/1000 tasks during the map stage.
>
> > val rdd2 = rdd.join(rdd, numPartitions=1000)
> > .map(fp=>((fp._2._1, fp._2._2), 1))
> > .reduceByKey((x,y)=>x+y)
> > rdd2.count()
>
> Which errors with a stack trace like:
>
> > 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
> stage 2.0 (TID 498)
> > java.lang.OutOfMemoryError: GC overhead limit exceeded
> >         at
> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
> >         at
> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
> >         at
> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
> >         at
> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
> >         at scala.collection.immutable.List.foreach(List.scala:318)
>
> rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
> co-occuring values by key in the dataset, i.e. 'These two numbers occurred
> with the same key n times'. I intentionally don't want to filter out
> duplicates and reflections. rdd is about 3.6 million records, which has a
> size in memory of about 120MB, and results in a 'joined' RDD (before the
> reduceByKey stage) of around 460 million records, with a size in memory of
> about 35GB.
>
> My cluster setup is as follows. I have 3 nodes, where each node has 2
> cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
> executors are allowed 1280m each and the job has 5 executors and 1 driver.
> Additionally, I have set spark.storage.memoryFraction to 0.06, and
> spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
> the issue. I've also tried increasing the number of partitions after the
> join dramatically (up to 15000). Nothing has been effective. Thus, I'm
> beginning to suspect I don't have enough resources for the job.
>
> Does anyone have a feeling about what the resource requirements would be
> for a use-case like this? I could scale the cluster up if necessary, but
> would like to avoid it. I'm willing to accept longer computation times if
> that is an option.
>
> Warm Regards,
> Thomas
>
>

Reply via email to