Doing the reduceByKey without changing the number of partitions and then do
a coalesce works.
But the other version still hangs, without any information (while working
with spark 1.1.1). The previous logs don't seem to be related to what
happens.
I don't think this is a memory issue as the GC time remains low and the
shuffle read is small. My guess is that it might be related to a high
number of initial partitions, but in that case shouldn't it fail for
coalesce too...?

Does anyone have an idea where to look at to find what the source of the
problem is?

Thanks,
Eugen

2015-03-13 19:18 GMT+01:00 Eugen Cepoi <cepoi.eu...@gmail.com>:

> Hum increased it to 1024 but doesn't help still have the same problem :(
>
> 2015-03-13 18:28 GMT+01:00 Eugen Cepoi <cepoi.eu...@gmail.com>:
>
>> The one by default 0.07 of executor memory. I'll try increasing it and
>> post back the result.
>>
>> Thanks
>>
>> 2015-03-13 18:09 GMT+01:00 Ted Yu <yuzhih...@gmail.com>:
>>
>>> Might be related: what's the value
>>> for spark.yarn.executor.memoryOverhead ?
>>>
>>> See SPARK-6085
>>>
>>> Cheers
>>>
>>> On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi <cepoi.eu...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1.
>>>> Strange thing, the exact same code does work (after upgrade) in the
>>>> spark-shell. But this information might be misleading as it works with
>>>> 1.1.1...
>>>>
>>>>
>>>> *The job takes as input two data sets:*
>>>>  - rdd A of +170gb (with less it is hard to reproduce) and more than
>>>> 11K partitions
>>>>  - rdd B of +100mb and 32 partitions
>>>>
>>>> I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am
>>>> not sure the executor config is relevant here. Anyway I tried with multiple
>>>> small executors with fewer ram and the inverse.
>>>>
>>>>
>>>> *The job basically does this:*
>>>> A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save
>>>>
>>>> After the flatMap rdd A size is much smaller similar to B.
>>>>
>>>> *Configs I used to run this job:*
>>>>
>>>> storage.memoryFraction: 0
>>>> shuffle.memoryFraction: 0.5
>>>>
>>>> akka.timeout 500
>>>> akka.frameSize 40
>>>>
>>>> // this one defines also the memory used by yarn master, but not sure
>>>> if it needs to be important
>>>> driver.memory 5g
>>>> excutor.memory 4250m
>>>>
>>>> I have 7 executors with 2 cores.
>>>>
>>>> *What happens:*
>>>> The job produces two stages: keyBy and save. The keyBy stage runs fine
>>>> and produces a shuffle write of ~150mb. The save stage where the suffle
>>>> read occurs hangs. Greater the initial dataset is more tasks hang.
>>>>
>>>> I did run it for much larger datasets with same config/cluster but
>>>> without doing the union and it worked fine.
>>>>
>>>> *Some more infos and logs:*
>>>>
>>>> Amongst 4 nodes 1 finished all his tasks and the "running" ones are on
>>>> the 3 other nodes. But not sure this is a good information (one node that
>>>> completed all his work vs the others) as with some smaller dataset I manage
>>>> to get only one hanging task.
>>>>
>>>> Here are the last parts of the executor logs that show some timeouts.
>>>>
>>>> *An executor from node ip-10-182-98-220*
>>>>
>>>> 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 
>>>> remote fetches in 66 ms
>>>> 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in 
>>>> connection from /10.181.48.153:56806
>>>> java.io.IOException: Connection timed out
>>>>
>>>>
>>>> *An executor from node ip-10-181-103-186*
>>>>
>>>> 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 
>>>> remote fetches in 20 ms
>>>> 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in 
>>>> connection from /10.182.98.220:38784
>>>> java.io.IOException: Connection timed out
>>>>
>>>> *An executor from node ip-10-181-48-153* (all the logs bellow belong this 
>>>> node)
>>>>
>>>> 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 
>>>> (TID 13860). 802 bytes result sent to driver
>>>> 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in 
>>>> connection from /10.181.103.186:46381
>>>> java.io.IOException: Connection timed out
>>>>
>>>> *Followed by many *
>>>>
>>>> 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
>>>> result 
>>>> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, 
>>>> chunkIndex=405}, 
>>>> buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
>>>>  offset=8631, length=571}} to /10.181.103.186:46381; closing connection
>>>> java.nio.channels.ClosedChannelException
>>>>
>>>> *with last one being*
>>>>
>>>> 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending 
>>>> result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to 
>>>> /10.181.103.186:46381; closing connection
>>>> java.nio.channels.ClosedChannelException
>>>>
>>>>
>>>> The executors from the node that finished his tasks doesn't show
>>>> anything special.
>>>>
>>>> Note that I don't cache anything thus reduced the
>>>> storage.memoryFraction to 0.
>>>> I see some of those, but don't think they are related.
>>>>
>>>> 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 
>>>> 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 
>>>> 0.0 B.
>>>>
>>>>
>>>> Sorry for the long email with maybe misleading infos, but I hope it
>>>> might help to track down what happens and why it was working with spark
>>>> 1.1.1.
>>>>
>>>> Thanks,
>>>> Eugen
>>>>
>>>>
>>>
>>
>

Reply via email to