Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
Hey Dimitriy, thanks for sharing your solution. I have some more updates. The problem comes out when shuffle is involved. Using coalesce shuffle true behaves like reduceByKey+smaller num of partitions, except that the whole save stage hangs. I am not sure yet if it only happens with UnionRDD or also for cogroup like. Changing spark.shuffle.blockTransferService to use nio (default pre 1.2) solves the problem. So it looks like this problem arises with the new netty based impl. 2015-03-18 1:26 GMT+01:00 Dmitriy Lyubimov dlie...@gmail.com: FWIW observed similar behavior in similar situation. Was able to work around by forcefully committing one of the rdds right before the union into cache, and forcing that by executing take(1). Nothing else ever helped. Seems like yet-undiscovered 1.2.x thing. On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Doing the reduceByKey without changing the number of partitions and then do a coalesce works. But the other version still hangs, without any information (while working with spark 1.1.1). The previous logs don't seem to be related to what happens. I don't think this is a memory issue as the GC time remains low and the shuffle read is small. My guess is that it might be related to a high number of initial partitions, but in that case shouldn't it fail for coalesce too...? Does anyone have an idea where to look at to find what the source of the problem is? Thanks, Eugen 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... The job takes as input two data sets: - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. The job basically does this: A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. Configs I used to run this job: storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. What happens: The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. Some more infos and logs: Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. An executor from node ip-10-182-98-220 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out An executor from node ip-10-181-103-186 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out An executor from node ip-10-181-48-153 (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out Followed by many 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
Doing the reduceByKey without changing the number of partitions and then do a coalesce works. But the other version still hangs, without any information (while working with spark 1.1.1). The previous logs don't seem to be related to what happens. I don't think this is a memory issue as the GC time remains low and the shuffle read is small. My guess is that it might be related to a high number of initial partitions, but in that case shouldn't it fail for coalesce too...? Does anyone have an idea where to look at to find what the source of the problem is? Thanks, Eugen 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... *The job takes as input two data sets:* - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. *The job basically does this:* A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. *Configs I used to run this job:* storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. *What happens:* The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. *Some more infos and logs:* Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. *An executor from node ip-10-182-98-220* 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out *An executor from node ip-10-181-103-186* 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out *An executor from node ip-10-181-48-153* (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out *Followed by many * 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException *with last one being* 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything special. Note that I don't cache anything thus reduced the storage.memoryFraction to 0. I see some of those, but don't think they are related. 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 B. Sorry for
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
FWIW observed similar behavior in similar situation. Was able to work around by forcefully committing one of the rdds right before the union into cache, and forcing that by executing take(1). Nothing else ever helped. Seems like yet-undiscovered 1.2.x thing. On Tue, Mar 17, 2015 at 4:21 PM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Doing the reduceByKey without changing the number of partitions and then do a coalesce works. But the other version still hangs, without any information (while working with spark 1.1.1). The previous logs don't seem to be related to what happens. I don't think this is a memory issue as the GC time remains low and the shuffle read is small. My guess is that it might be related to a high number of initial partitions, but in that case shouldn't it fail for coalesce too...? Does anyone have an idea where to look at to find what the source of the problem is? Thanks, Eugen 2015-03-13 19:18 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... The job takes as input two data sets: - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. The job basically does this: A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. Configs I used to run this job: storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. What happens: The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. Some more infos and logs: Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. An executor from node ip-10-182-98-220 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out An executor from node ip-10-181-103-186 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out An executor from node ip-10-181-48-153 (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out Followed by many 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException with last one being 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
Hum increased it to 1024 but doesn't help still have the same problem :( 2015-03-13 18:28 GMT+01:00 Eugen Cepoi cepoi.eu...@gmail.com: The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... *The job takes as input two data sets:* - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. *The job basically does this:* A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. *Configs I used to run this job:* storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. *What happens:* The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. *Some more infos and logs:* Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. *An executor from node ip-10-182-98-220* 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out *An executor from node ip-10-181-103-186* 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out *An executor from node ip-10-181-48-153* (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out *Followed by many * 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException *with last one being* 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything special. Note that I don't cache anything thus reduced the storage.memoryFraction to 0. I see some of those, but don't think they are related. 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 B. Sorry for the long email with maybe misleading infos, but I hope it might help to track down what happens and why it was working with spark 1.1.1. Thanks, Eugen
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... *The job takes as input two data sets:* - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. *The job basically does this:* A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. *Configs I used to run this job:* storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. *What happens:* The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. *Some more infos and logs:* Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. *An executor from node ip-10-182-98-220* 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out *An executor from node ip-10-181-103-186* 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out *An executor from node ip-10-181-48-153* (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out *Followed by many * 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException *with last one being* 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything special. Note that I don't cache anything thus reduced the storage.memoryFraction to 0. I see some of those, but don't think they are related. 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 B. Sorry for the long email with maybe misleading infos, but I hope it might help to track down what happens and why it was working with spark 1.1.1. Thanks, Eugen
Re: Hanging tasks in spark 1.2.1 while working with 1.1.1
The one by default 0.07 of executor memory. I'll try increasing it and post back the result. Thanks 2015-03-13 18:09 GMT+01:00 Ted Yu yuzhih...@gmail.com: Might be related: what's the value for spark.yarn.executor.memoryOverhead ? See SPARK-6085 Cheers On Fri, Mar 13, 2015 at 9:45 AM, Eugen Cepoi cepoi.eu...@gmail.com wrote: Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... *The job takes as input two data sets:* - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. *The job basically does this:* A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. *Configs I used to run this job:* storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. *What happens:* The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. *Some more infos and logs:* Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. *An executor from node ip-10-182-98-220* 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out *An executor from node ip-10-181-103-186* 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out *An executor from node ip-10-181-48-153* (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out *Followed by many * 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException *with last one being* 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything special. Note that I don't cache anything thus reduced the storage.memoryFraction to 0. I see some of those, but don't think they are related. 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 B. Sorry for the long email with maybe misleading infos, but I hope it might help to track down what happens and why it was working with spark 1.1.1. Thanks, Eugen
Hanging tasks in spark 1.2.1 while working with 1.1.1
Hi, I have a job that hangs after upgrading to spark 1.2.1 from 1.1.1. Strange thing, the exact same code does work (after upgrade) in the spark-shell. But this information might be misleading as it works with 1.1.1... *The job takes as input two data sets:* - rdd A of +170gb (with less it is hard to reproduce) and more than 11K partitions - rdd B of +100mb and 32 partitions I run it via EMR over YARN and use 4*m3.xlarge computing nodes. I am not sure the executor config is relevant here. Anyway I tried with multiple small executors with fewer ram and the inverse. *The job basically does this:* A.flatMap(...).union(B).keyBy(f).reduceByKey(..., 32).map(...).save After the flatMap rdd A size is much smaller similar to B. *Configs I used to run this job:* storage.memoryFraction: 0 shuffle.memoryFraction: 0.5 akka.timeout 500 akka.frameSize 40 // this one defines also the memory used by yarn master, but not sure if it needs to be important driver.memory 5g excutor.memory 4250m I have 7 executors with 2 cores. *What happens:* The job produces two stages: keyBy and save. The keyBy stage runs fine and produces a shuffle write of ~150mb. The save stage where the suffle read occurs hangs. Greater the initial dataset is more tasks hang. I did run it for much larger datasets with same config/cluster but without doing the union and it worked fine. *Some more infos and logs:* Amongst 4 nodes 1 finished all his tasks and the running ones are on the 3 other nodes. But not sure this is a good information (one node that completed all his work vs the others) as with some smaller dataset I manage to get only one hanging task. Here are the last parts of the executor logs that show some timeouts. *An executor from node ip-10-182-98-220* 15/03/13 15:43:10 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 66 ms 15/03/13 15:58:44 WARN server.TransportChannelHandler: Exception in connection from /10.181.48.153:56806 java.io.IOException: Connection timed out *An executor from node ip-10-181-103-186* 15/03/13 15:43:22 INFO storage.ShuffleBlockFetcherIterator: Started 6 remote fetches in 20 ms 15/03/13 15:58:41 WARN server.TransportChannelHandler: Exception in connection from /10.182.98.220:38784 java.io.IOException: Connection timed out *An executor from node ip-10-181-48-153* (all the logs bellow belong this node) 15/03/13 15:43:24 INFO executor.Executor: Finished task 26.0 in stage 1.0 (TID 13860). 802 bytes result sent to driver 15/03/13 15:58:43 WARN server.TransportChannelHandler: Exception in connection from /10.181.103.186:46381 java.io.IOException: Connection timed out *Followed by many * 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=2064203432016, chunkIndex=405}, buffer=FileSegmentManagedBuffer{file=/mnt/var/lib/hadoop/tmp/nm-local-dir/usercache/hadoop/appcache/application_1426256247374_0002/spark-1659efcd-c6b6-4a12-894d-e869486d3d00/35/shuffle_0_9885_0.data, offset=8631, length=571}} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException *with last one being* 15/03/13 15:58:43 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=7377187355282895939, response=[B@6fcd0014} to /10.181.103.186:46381; closing connection java.nio.channels.ClosedChannelException The executors from the node that finished his tasks doesn't show anything special. Note that I don't cache anything thus reduced the storage.memoryFraction to 0. I see some of those, but don't think they are related. 15/03/13 15:43:15 INFO storage.MemoryStore: Memory use = 0.0 B (blocks) + 0.0 B (scratch space shared across 0 thread(s)) = 0.0 B. Storage limit = 0.0 B. Sorry for the long email with maybe misleading infos, but I hope it might help to track down what happens and why it was working with spark 1.1.1. Thanks, Eugen