Hi,

I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange
thing, the exact same code does work (after upgrade) in the spark-shell.
But this information might be misleading as it works with 1.1.1...


*The job takes as input two data sets:*
 - rdd A of +170gb (with less it is hard to reproduce) and more than 11K
partitions
 - rdd B of +100mb and 32 partitions

I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not
sure the executor config is relevant here. Anyway I tried with multiple
small executors with fewer ram and the inverse.


*The job basically does this:*
A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save

After the flatMap rdd A size is much smaller similar to B.

*Configs I used to run this job:*

storage.memoryFraction: 0
shuffle.memoryFraction: 0.5

akka.timeout 500
akka.frameSize 40

// this one defines also the memory used by yarn master, but not sure if it
needs to be important
driver.memory 5g
excutor.memory 4250m

I have 7 executors with 2 cores.

*What happens:*
The job produces two stages: keyBy and save. The keyBy stage runs fine and
produces a shuffle write of ~150mb. The save stage where the suffle read
occurs hangs. Greater the initial dataset is more tasks hang.

I did run it for much larger datasets with same config/cluster but without
doing the union and it worked fine.

*Some more infos and logs:*

Amongst 4 nodes 1 finished all his tasks and the "running" ones are on the
3 other nodes. But not sure this is a good information (one node that
completed all his work vs the others) as with some smaller dataset I manage
to get only one hanging task.

Here are the last parts of the executor logs that show some timeouts.

*An executor from node ip-10-182-98-220*

15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6
remote fetches in 66 ms
15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in
connection from /10.181.48.153:56806
java.io.IOException: Connection timed out


*An executor from node ip-10-181-103-186*

15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6
remote fetches in 20 ms
15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in
connection from /10.182.98.220:38784
java.io.IOException: Connection timed out

*An executor from node ip-10-181-48-153* (all the logs bellow belong this node)

15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage
1.0 (TID 13860). 802 bytes result sent to driver
15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in
connection from /10.181.103.186:46381
java.io.IOException: Connection timed out

*Followed by many *

15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016,
chunkIndex=405},
buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data,
offset=8631, length=571}} to /10.181.103.186:46381; closing connection
java.nio.channels.ClosedChannelException

*with last one being*

15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending
result RpcResponse{requestId=7377187355282895939,
response=[B@6fcd0014} to /10.181.103.186:46381; closing connection
java.nio.channels.ClosedChannelException


The executors from the node that finished his tasks doesn't show anything
special.

Note that I don't cache anything thus reduced the storage.memoryFraction to
0.
I see some of those, but don't think they are related.

15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B
(blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B.
Storage limit = 0.0 B.


Sorry for the long email with maybe misleading infos, but I hope it might
help to track down what happens and why it was working with spark 1.1.1.

Thanks,
Eugen

Reply via email to