Yeah, the spark engine is pretty clever and its best not to pre-maturely 
optimize. It would be interesting to profile your join vs. the collect on the 
smaller dataset. I suspect that the join is faster (even before you broadcast 
it back out).

I’m also curious about the broadcast OOM - did you try expanding the driver 
memory?

> On Mar 7, 2016, at 8:28 PM, Arash <aras...@gmail.com> wrote:
> 
> So I just implemented the logic through a standard join (without collect and 
> broadcast) and it's working great.
> 
> The idea behind trying the broadcast was that since the other side of join is 
> a much larger dataset, the process might be faster through collect and 
> broadcast, since it avoids the shuffle of the bigger dataset. 
> 
> I think the join is working much better in this case so I'll probably just 
> use that, still a bit curious as why the error is happening.
> 
> On Mon, Mar 7, 2016 at 5:55 PM, Tristan Nixon <st...@memeticlabs.org 
> <mailto:st...@memeticlabs.org>> wrote:
> I’m not sure I understand - if it was already distributed over the cluster in 
> an RDD, why would you want to collect and then re-send it as a broadcast 
> variable? Why not simply use the RDD that is already distributed on the 
> worker nodes?
> 
>> On Mar 7, 2016, at 7:44 PM, Arash <aras...@gmail.com 
>> <mailto:aras...@gmail.com>> wrote:
>> 
>> Hi Tristan, 
>> 
>> This is not static, I actually collect it from an RDD to the driver. 
>> 
>> On Mon, Mar 7, 2016 at 5:42 PM, Tristan Nixon <st...@memeticlabs.org 
>> <mailto:st...@memeticlabs.org>> wrote:
>> Hi Arash,
>> 
>> is this static data?  Have you considered including it in your jars and 
>> de-serializing it from jar on each worker node?
>> It’s not pretty, but it’s a workaround for serialization troubles.
>> 
>>> On Mar 7, 2016, at 5:29 PM, Arash <aras...@gmail.com 
>>> <mailto:aras...@gmail.com>> wrote:
>>> 
>>> Hello all,
>>> 
>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes but 
>>> haven't been able to make it work so far.
>>> 
>>> It looks like the executors start to run out of memory during 
>>> deserialization. This behavior only shows itself when the number of 
>>> partitions is above a few 10s, the broadcast does work for 10 or 20 
>>> partitions. 
>>> 
>>> I'm using the following setup to observe the problem:
>>> 
>>> val tuples: Array[((String, String), (String, String))]      // ~ 10M tuples
>>> val tuplesBc = sc.broadcast(tuples)
>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>> numsRdd.map(n => tuplesBc.value.head).count()
>>> 
>>> If I set the number of partitions for numsRDD to 20, the count goes through 
>>> successfully, but at 100, I'll start to get errors such as:
>>> 
>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage 
>>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap 
>>> space
>>>         at 
>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>>         at 
>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>>         at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>>         at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>         at 
>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>         at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>         at 
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>>         at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>>         at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at 
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at 
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at 
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at 
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>> 
>>> 
>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark 
>>> property maximizeResourceAllocation is set to true (executor.memory = 48G 
>>> according to spark ui environment). We're also using kryo serialization and 
>>> Yarn is the resource manager.
>>> 
>>> Any ideas as what might be going wrong and how to debug this?
>>> 
>>> Thanks,
>>> Arash
>>> 
>> 
>> 
> 
> 

Reply via email to