The driver memory is set at 40G and OOM seems to be happening on the executors. I might try a different broadcast block size (vs 4m) as Takeshi suggested to see if it makes a difference.
On Mon, Mar 7, 2016 at 6:54 PM, Tristan Nixon <st...@memeticlabs.org> wrote: > Yeah, the spark engine is pretty clever and its best not to pre-maturely > optimize. It would be interesting to profile your join vs. the collect on > the smaller dataset. I suspect that the join is faster (even before you > broadcast it back out). > > I’m also curious about the broadcast OOM - did you try expanding the > driver memory? > > On Mar 7, 2016, at 8:28 PM, Arash <aras...@gmail.com> wrote: > > So I just implemented the logic through a standard join (without collect > and broadcast) and it's working great. > > The idea behind trying the broadcast was that since the other side of join > is a much larger dataset, the process might be faster through collect and > broadcast, since it avoids the shuffle of the bigger dataset. > > I think the join is working much better in this case so I'll probably just > use that, still a bit curious as why the error is happening. > > On Mon, Mar 7, 2016 at 5:55 PM, Tristan Nixon <st...@memeticlabs.org> > wrote: > >> I’m not sure I understand - if it was already distributed over the >> cluster in an RDD, why would you want to collect and then re-send it as a >> broadcast variable? Why not simply use the RDD that is already distributed >> on the worker nodes? >> >> On Mar 7, 2016, at 7:44 PM, Arash <aras...@gmail.com> wrote: >> >> Hi Tristan, >> >> This is not static, I actually collect it from an RDD to the driver. >> >> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon <st...@memeticlabs.org> >> wrote: >> >>> Hi Arash, >>> >>> is this static data? Have you considered including it in your jars and >>> de-serializing it from jar on each worker node? >>> It’s not pretty, but it’s a workaround for serialization troubles. >>> >>> On Mar 7, 2016, at 5:29 PM, Arash <aras...@gmail.com> wrote: >>> >>> Hello all, >>> >>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes >>> but haven't been able to make it work so far. >>> >>> It looks like the executors start to run out of memory during >>> deserialization. This behavior only shows itself when the number of >>> partitions is above a few 10s, the broadcast does work for 10 or 20 >>> partitions. >>> >>> I'm using the following setup to observe the problem: >>> >>> val tuples: Array[((String, String), (String, String))] // ~ 10M >>> tuples >>> val tuplesBc = sc.broadcast(tuples) >>> val numsRdd = sc.parallelize(1 to 5000, 100) >>> numsRdd.map(n => tuplesBc.value.head).count() >>> >>> If I set the number of partitions for numsRDD to 20, the count goes >>> through successfully, but at 100, I'll start to get errors such as: >>> >>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage >>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap >>> space >>> at >>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472) >>> at >>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) >>> at >>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) >>> at >>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at >>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) >>> at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) >>> at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) >>> >>> >>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark >>> property maximizeResourceAllocation is set to true (executor.memory = 48G >>> according to spark ui environment). We're also using kryo serialization and >>> Yarn is the resource manager. >>> >>> Any ideas as what might be going wrong and how to debug this? >>> >>> Thanks, >>> Arash >>> >>> >>> >> >> > >