Java's default serialization is not the best/most efficient way of handling
ser/deser, did you try switching to Kryo serialization ?
c.f.
https://ogirardot.wordpress.com/2015/01/09/changing-sparks-default-java-serialization-to-kryo/
if you need a tutorial.

This should help in terms of both CPU and memory usage, if you want to
achieve best performance turn on registration too and register manually
what you need.
Plus if you have some kind of key value clearly partitioned you can try
https://github.com/amplab/spark-indexedrdd

cheers,
Olivier.

2016-03-08 6:09 GMT+01:00 Arash <aras...@gmail.com>:

> The driver memory is set at 40G and OOM seems to be happening on the
> executors. I might try a different broadcast block size (vs 4m) as Takeshi
> suggested to see if it makes a difference.
>
> On Mon, Mar 7, 2016 at 6:54 PM, Tristan Nixon <st...@memeticlabs.org>
> wrote:
>
>> Yeah, the spark engine is pretty clever and its best not to pre-maturely
>> optimize. It would be interesting to profile your join vs. the collect on
>> the smaller dataset. I suspect that the join is faster (even before you
>> broadcast it back out).
>>
>> I’m also curious about the broadcast OOM - did you try expanding the
>> driver memory?
>>
>> On Mar 7, 2016, at 8:28 PM, Arash <aras...@gmail.com> wrote:
>>
>> So I just implemented the logic through a standard join (without collect
>> and broadcast) and it's working great.
>>
>> The idea behind trying the broadcast was that since the other side of
>> join is a much larger dataset, the process might be faster through collect
>> and broadcast, since it avoids the shuffle of the bigger dataset.
>>
>> I think the join is working much better in this case so I'll probably
>> just use that, still a bit curious as why the error is happening.
>>
>> On Mon, Mar 7, 2016 at 5:55 PM, Tristan Nixon <st...@memeticlabs.org>
>> wrote:
>>
>>> I’m not sure I understand - if it was already distributed over the
>>> cluster in an RDD, why would you want to collect and then re-send it as a
>>> broadcast variable? Why not simply use the RDD that is already distributed
>>> on the worker nodes?
>>>
>>> On Mar 7, 2016, at 7:44 PM, Arash <aras...@gmail.com> wrote:
>>>
>>> Hi Tristan,
>>>
>>> This is not static, I actually collect it from an RDD to the driver.
>>>
>>> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon <st...@memeticlabs.org>
>>> wrote:
>>>
>>>> Hi Arash,
>>>>
>>>> is this static data?  Have you considered including it in your jars and
>>>> de-serializing it from jar on each worker node?
>>>> It’s not pretty, but it’s a workaround for serialization troubles.
>>>>
>>>> On Mar 7, 2016, at 5:29 PM, Arash <aras...@gmail.com> wrote:
>>>>
>>>> Hello all,
>>>>
>>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>>>> but haven't been able to make it work so far.
>>>>
>>>> It looks like the executors start to run out of memory during
>>>> deserialization. This behavior only shows itself when the number of
>>>> partitions is above a few 10s, the broadcast does work for 10 or 20
>>>> partitions.
>>>>
>>>> I'm using the following setup to observe the problem:
>>>>
>>>> val tuples: Array[((String, String), (String, String))]      // ~ 10M
>>>> tuples
>>>> val tuplesBc = sc.broadcast(tuples)
>>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>>> numsRdd.map(n => tuplesBc.value.head).count()
>>>>
>>>> If I set the number of partitions for numsRDD to 20, the count goes
>>>> through successfully, but at 100, I'll start to get errors such as:
>>>>
>>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in
>>>> stage 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java
>>>> heap space
>>>>         at
>>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>>>         at
>>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>         at
>>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>         at
>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>         at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>         at
>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>         at
>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>         at
>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>
>>>>
>>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>>>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>>>> according to spark ui environment). We're also using kryo serialization and
>>>> Yarn is the resource manager.
>>>>
>>>> Any ideas as what might be going wrong and how to debug this?
>>>>
>>>> Thanks,
>>>> Arash
>>>>
>>>>
>>>>
>>>
>>>
>>
>>
>


-- 
*Olivier Girardot* | Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

Reply via email to