Hi Ankur,

For this specific test, I'm only running the few lines of code that are
pasted. Nothing else is cached in the cluster.

Thanks,
Arash

On Mon, Mar 7, 2016 at 4:07 PM, Ankur Srivastava <ankur.srivast...@gmail.com
> wrote:

> Hi,
>
> We have a use case where we broadcast ~4GB of data and we are on
> m3.2xlarge so your object size is not an issue. Also based on your
> explanation does not look like a broadcast issue as it works when your
> partition size is small.
>
> Are you caching any other data? Because boradcast variable use the cache
> memory.
>
> Thanks
> Ankur
>
> On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Any reason why do you broadcast such large variable ? It doesn't make
>> sense to me
>>
>> On Tue, Mar 8, 2016 at 7:29 AM, Arash <aras...@gmail.com> wrote:
>>
>>> Hello all,
>>>
>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20 nodes
>>> but haven't been able to make it work so far.
>>>
>>> It looks like the executors start to run out of memory during
>>> deserialization. This behavior only shows itself when the number of
>>> partitions is above a few 10s, the broadcast does work for 10 or 20
>>> partitions.
>>>
>>> I'm using the following setup to observe the problem:
>>>
>>> val tuples: Array[((String, String), (String, String))]      // ~ 10M
>>> tuples
>>> val tuplesBc = sc.broadcast(tuples)
>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>> numsRdd.map(n => tuplesBc.value.head).count()
>>>
>>> If I set the number of partitions for numsRDD to 20, the count goes
>>> through successfully, but at 100, I'll start to get errors such as:
>>>
>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in stage
>>> 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java heap
>>> space
>>>         at
>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>>         at
>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>         at
>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>         at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>         at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>         at
>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>         at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>         at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>         at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>         at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>
>>>
>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>>> according to spark ui environment). We're also using kryo serialization and
>>> Yarn is the resource manager.
>>>
>>> Any ideas as what might be going wrong and how to debug this?
>>>
>>> Thanks,
>>> Arash
>>>
>>>
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>
>

Reply via email to