Hi, We have a use case where we broadcast ~4GB of data and we are on m3.2xlarge so your object size is not an issue. Also based on your explanation does not look like a broadcast issue as it works when your partition size is small.
Are you caching any other data? Because boradcast variable use the cache memory. Thanks Ankur On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang <zjf...@gmail.com> wrote: > Any reason why do you broadcast such large variable ? It doesn't make > sense to me > > On Tue, Mar 8, 2016 at 7:29 AM, Arash <aras...@gmail.com> wrote: > >> Hello all, >> >> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes >> but haven't been able to make it work so far. >> >> It looks like the executors start to run out of memory during >> deserialization. This behavior only shows itself when the number of >> partitions is above a few 10s, the broadcast does work for 10 or 20 >> partitions. >> >> I'm using the following setup to observe the problem: >> >> val tuples: Array[((String, String), (String, String))] // ~ 10M >> tuples >> val tuplesBc = sc.broadcast(tuples) >> val numsRdd = sc.parallelize(1 to 5000, 100) >> numsRdd.map(n => tuplesBc.value.head).count() >> >> If I set the number of partitions for numsRDD to 20, the count goes >> through successfully, but at 100, I'll start to get errors such as: >> >> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage >> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap >> space >> at >> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472) >> at >> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >> at >> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) >> at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >> at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at >> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >> at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >> at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >> >> >> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark >> property maximizeResourceAllocation is set to true (executor.memory = 48G >> according to spark ui environment). We're also using kryo serialization and >> Yarn is the resource manager. >> >> Any ideas as what might be going wrong and how to debug this? >> >> Thanks, >> Arash >> >> > > > -- > Best Regards > > Jeff Zhang >