Thanks for the reply, I'll try your suggestions.

Apologies, in my previous post I was mistaken. rdd is actually an PairRDD
of (Int, Int). I'm doing the self-join so I can count two things. First, I
can count the number of times a value appears in the data set. Second I can
count number of times values occur with the same key. For example, if I
have the following:

(1,2)
(1,3)
(4,3)

Then joining with itself I get:

(1,(2,2)) - map - ((2,2),1) - reduceByKey - ((2,2),1)
(1,(2,3)) - map - ((2,3),1) - reduceByKey - ((2,3),1)
(1,(3,2)) - map - ((3,2),1) - reduceByKey - ((3,2),1)
(1,(3,3)) - map - ((3,3),1) - reduceByKey - ((3,3),2)
(4,(3,3)) - map - ((3,3),1) _|

Note that I want to keep the duplicates (2,2) and reflections.

Rgds

On 18 February 2015 at 09:00, Akhil Das <ak...@sigmoidanalytics.com> wrote:

> Why are you joining the rdd with itself?
>
> You can try these things:
>
> - Change the StorageLevel of both rdds to MEMORY_AND_DISK_2 or
> MEMORY_AND_DISK_SER, so that it doesnt need to keep everything up in memory.
>
> - Set your default Serializer to Kryo (.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer"))
>
> - Enable rdd compression (.set("spark.rdd.compress","true"))
>
>
> Thanks
> Best Regards
>
> On Wed, Feb 18, 2015 at 12:21 PM, Tom Walwyn <twal...@gmail.com> wrote:
>
>> Hi All,
>>
>> I'm a new Spark (and Hadoop) user and I want to find out if the cluster
>> resources I am using are feasible for my use-case. The following is a
>> snippet of code that is causing a OOM exception in the executor after about
>> 125/1000 tasks during the map stage.
>>
>> > val rdd2 = rdd.join(rdd, numPartitions=1000)
>> > .map(fp=>((fp._2._1, fp._2._2), 1))
>> > .reduceByKey((x,y)=>x+y)
>> > rdd2.count()
>>
>> Which errors with a stack trace like:
>>
>> > 15/02/17 16:30:11 ERROR executor.Executor: Exception in task 98.0 in
>> stage 2.0 (TID 498)
>> > java.lang.OutOfMemoryError: GC overhead limit exceeded
>> >         at
>> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:168)
>> >         at
>> scala.collection.mutable.ListBuffer.$plus$eq(ListBuffer.scala:45)
>> >         at
>> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
>> >         at
>> scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:48)
>> >         at scala.collection.immutable.List.foreach(List.scala:318)
>>
>> rdd is a PairRDD of (Int, (Int, Int)). The idea is to get the count of
>> co-occuring values by key in the dataset, i.e. 'These two numbers occurred
>> with the same key n times'. I intentionally don't want to filter out
>> duplicates and reflections. rdd is about 3.6 million records, which has a
>> size in memory of about 120MB, and results in a 'joined' RDD (before the
>> reduceByKey stage) of around 460 million records, with a size in memory of
>> about 35GB.
>>
>> My cluster setup is as follows. I have 3 nodes, where each node has 2
>> cores and about 7.5GB of memory. I'm running Spark on YARN. The driver and
>> executors are allowed 1280m each and the job has 5 executors and 1 driver.
>> Additionally, I have set spark.storage.memoryFraction to 0.06, and
>> spark.shuffle.memoryFraction to 0.65 in the hopes that this would mitigate
>> the issue. I've also tried increasing the number of partitions after the
>> join dramatically (up to 15000). Nothing has been effective. Thus, I'm
>> beginning to suspect I don't have enough resources for the job.
>>
>> Does anyone have a feeling about what the resource requirements would be
>> for a use-case like this? I could scale the cluster up if necessary, but
>> would like to avoid it. I'm willing to accept longer computation times if
>> that is an option.
>>
>> Warm Regards,
>> Thomas
>>
>>
>

Reply via email to