Hi Takeshi, we've the driver memory set to 40G. I don't think the driver is
having memory issues. It looks like the executors start to fail due to
memory issues and then the broadcast p2p algorithm starts to fail.

On Mon, Mar 7, 2016 at 5:36 PM, Takeshi Yamamuro <linguin....@gmail.com>
wrote:

> Hi,
>
> I think a broadcast logic itself works well in spite of input size (no
> idea about its efficiency).
>
> How about your memory size in a driver?
> when you broadcast some large variables, a driver eats much memory for
> splitting the variable into blocks and their serializations.
>
> Thanks,
> maropu
>
> On Tue, Mar 8, 2016 at 9:30 AM, Arash <aras...@gmail.com> wrote:
>
>> Hi Ankur,
>>
>> For this specific test, I'm only running the few lines of code that are
>> pasted. Nothing else is cached in the cluster.
>>
>> Thanks,
>> Arash
>>
>>
>> On Mon, Mar 7, 2016 at 4:07 PM, Ankur Srivastava <
>> ankur.srivast...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> We have a use case where we broadcast ~4GB of data and we are on
>>> m3.2xlarge so your object size is not an issue. Also based on your
>>> explanation does not look like a broadcast issue as it works when your
>>> partition size is small.
>>>
>>> Are you caching any other data? Because boradcast variable use the cache
>>> memory.
>>>
>>> Thanks
>>> Ankur
>>>
>>> On Mon, Mar 7, 2016 at 3:34 PM, Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> Any reason why do you broadcast such large variable ? It doesn't make
>>>> sense to me
>>>>
>>>> On Tue, Mar 8, 2016 at 7:29 AM, Arash <aras...@gmail.com> wrote:
>>>>
>>>>> Hello all,
>>>>>
>>>>> I'm trying to broadcast a variable of size ~1G to a cluster of 20
>>>>> nodes but haven't been able to make it work so far.
>>>>>
>>>>> It looks like the executors start to run out of memory during
>>>>> deserialization. This behavior only shows itself when the number of
>>>>> partitions is above a few 10s, the broadcast does work for 10 or 20
>>>>> partitions.
>>>>>
>>>>> I'm using the following setup to observe the problem:
>>>>>
>>>>> val tuples: Array[((String, String), (String, String))]      // ~ 10M
>>>>> tuples
>>>>> val tuplesBc = sc.broadcast(tuples)
>>>>> val numsRdd = sc.parallelize(1 to 5000, 100)
>>>>> numsRdd.map(n => tuplesBc.value.head).count()
>>>>>
>>>>> If I set the number of partitions for numsRDD to 20, the count goes
>>>>> through successfully, but at 100, I'll start to get errors such as:
>>>>>
>>>>> 16/03/07 19:35:32 WARN scheduler.TaskSetManager: Lost task 77.0 in
>>>>> stage 1.0 (TID 1677, xxx.ec2.internal): java.lang.OutOfMemoryError: Java
>>>>> heap space
>>>>>         at
>>>>> java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3472)
>>>>>         at
>>>>> java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3278)
>>>>>         at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1789)
>>>>>         at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>         at
>>>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>>>>>         at
>>>>> scala.collection.immutable.HashMap$SerializationProxy.readObject(HashMap.scala:516)
>>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>         at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>>         at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>>         at
>>>>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
>>>>>         at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
>>>>>         at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>         at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>         at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>>         at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>>         at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>         at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>         at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>>         at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>>         at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>         at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>         at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>>         at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>>         at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>         at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>         at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>>         at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>>         at
>>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>>>>>         at
>>>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>>>>>         at
>>>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
>>>>>         at
>>>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
>>>>>
>>>>>
>>>>> I'm using spark 1.5.2. Cluster nodes are amazon r3.2xlarge. The spark
>>>>> property maximizeResourceAllocation is set to true (executor.memory = 48G
>>>>> according to spark ui environment). We're also using kryo serialization 
>>>>> and
>>>>> Yarn is the resource manager.
>>>>>
>>>>> Any ideas as what might be going wrong and how to debug this?
>>>>>
>>>>> Thanks,
>>>>> Arash
>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>

Reply via email to