Hi Arash,

is this static data?  Have you considered including it in your jars and 
de-serializing it from jar on each worker node?
It’s not pretty, but it’s a workaround for serialization troubles.

> On Mar 7, 2016, at 5:29 PM, Arash <aras...@gmail.com> wrote:
> 
> Hello all,
> 
> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes but 
> haven't been able to make it work so far.
> 
> It looks like the executors start to run out of memory during 
> deserialization. This behavior only shows itself when the number of 
> partitions is above a few 10s, the broadcast does work for 10 or 20 
> partitions. 
> 
> I'm using the following setup to observe the problem:
> 
> val tuples: Array[((String, String), (String, String))]      // ~ 10M tuples
> val tuplesBc = sc.broadcast(tuples)
> val numsRdd = sc.parallelize(1 to 5000, 100)
> numsRdd.map(n => tuplesBc.value.head).count()
> 
> If I set the number of partitions for numsRDD to 20, the count goes through 
> successfully, but at 100, I'll start to get errors such as:
> 
> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage 1.0 
> (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap space
>         at 
> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>         at 
> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>         at 
> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>         at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>         at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>         at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>         at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
> 
> 
> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark 
> property maximizeResourceAllocation is set to true (executor.memory = 48G 
> according to spark ui environment). We're also using kryo serialization and 
> Yarn is the resource manager.
> 
> Any ideas as what might be going wrong and how to debug this?
> 
> Thanks,
> Arash
> 

Reply via email to