Hum increased it to 1024 but doesn't help still have the same problem :(

2015-03-13 18:28 GMT+01:00 Eugen Cepoi <cepoi.eu...@gmail.com>:

> The one by default 0.07 of executor memory. I'll try increasing it and
> post back the result.
>
> Thanks
>
> 2015-03-13 18:09 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>
>> Might be related: what's the value for spark.yarn.executor.memoryOverhead
>> ?
>>
>> See SPARK-6085
>>
>> Cheers
>>
>> On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi <cepoi.eu...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
>>> Strange thing, the exact same code does work (after upgrade) in the
>>> spark-shell. But this information might be misleading as it works with
>>> 1.1.1...
>>>
>>>
>>> *The job takes as input two data sets:*
>>>  - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
>>> partitions
>>>  - rdd B of +100mb and 32 partitions
>>>
>>> I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
>>> not sure the executor config is relevant here. Anyway I tried with multiple
>>> small executors with fewer ram and the inverse.
>>>
>>>
>>> *The job basically does this:*
>>> A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save
>>>
>>> After the flatMap rdd A size is much smaller similar to B.
>>>
>>> *Configs I used to run this job:*
>>>
>>> storage.memoryFraction: 0
>>> shuffle.memoryFraction: 0.5
>>>
>>> akka.timeout 500
>>> akka.frameSize 40
>>>
>>> // this one defines also the memory used by yarn master, but not sure if
>>> it needs to be important
>>> driver.memory 5g
>>> excutor.memory 4250m
>>>
>>> I have 7 executors with 2 cores.
>>>
>>> *What happens:*
>>> The job produces two stages: keyBy and save. The keyBy stage runs fine
>>> and produces a shuffle write of ~150mb. The save stage where the suffle
>>> read occurs hangs. Greater the initial dataset is more tasks hang.
>>>
>>> I did run it for much larger datasets with same config/cluster but
>>> without doing the union and it worked fine.
>>>
>>> *Some more infos and logs:*
>>>
>>> Amongst 4 nodes 1 finished all his tasks and the "running" ones are on
>>> the 3 other nodes. But not sure this is a good information (one node that
>>> completed all his work vs the others) as with some smaller dataset I manage
>>> to get only one hanging task.
>>>
>>> Here are the last parts of the executor logs that show some timeouts.
>>>
>>> *An executor from node ip-10-182-98-220*
>>>
>>> 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 
>>> remote fetches in 66 ms
>>> 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
>>> connection from /10.181.48.153:56806
>>> java.io.IOException: Connection timed out
>>>
>>>
>>> *An executor from node ip-10-181-103-186*
>>>
>>> 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 
>>> remote fetches in 20 ms
>>> 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
>>> connection from /10.182.98.220:38784
>>> java.io.IOException: Connection timed out
>>>
>>> *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
>>> node)
>>>
>>> 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
>>> (TID 13860). 802 bytes result sent to driver
>>> 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
>>> connection from /10.181.103.186:46381
>>> java.io.IOException: Connection timed out
>>>
>>> *Followed by many *
>>>
>>> 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
>>> result 
>>> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
>>> chunkIndex=405}, 
>>> buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
>>>  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
>>> java.nio.channels.ClosedChannelException
>>>
>>> *with last one being*
>>>
>>> 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
>>> result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
>>> /10.181.103.186:46381; closing connection
>>> java.nio.channels.ClosedChannelException
>>>
>>>
>>> The executors from the node that finished his tasks doesn't show
>>> anything special.
>>>
>>> Note that I don't cache anything thus reduced the storage.memoryFraction
>>> to 0.
>>> I see some of those, but don't think they are related.
>>>
>>> 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
>>> 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 
>>> 0.0 B.
>>>
>>>
>>> Sorry for the long email with maybe misleading infos, but I hope it
>>> might help to track down what happens and why it was working with spark
>>> 1.1.1.
>>>
>>> Thanks,
>>> Eugen
>>>
>>>
>>
>

Reply via email to