[ https://issues.apache.org/jira/browse/SPARK-10486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15069054#comment-15069054 ]
Liang Chen commented on SPARK-10486: ------------------------------------ I meet the same problem > Spark intermittently fails to recover from a worker failure (in standalone > mode) > -------------------------------------------------------------------------------- > > Key: SPARK-10486 > URL: https://issues.apache.org/jira/browse/SPARK-10486 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.4.1 > Reporter: Cheuk Lam > Priority: Critical > > We have run into a problem where some Spark job is aborted after one worker > is killed in a 2-worker standalone cluster. The problem is intermittent, but > we can consistently reproduce it. The problem only appears to happen when we > kill a worker. It doesn't seem to happen when we kill an executor directly. > The program we use to reproduce the problem is some iterative program based > on GraphX, although the nature of the issue doesn't seem to be GraphX > related. This is how we reproduce the problem: > * Set up a standalone cluster of 2 workers; > * Run a Spark application of some iterative program (ours is some based on > GraphX); > * Kill a worker process (and thus the associated executor); > * Intermittently some job will be aborted. > The driver and the executor logs are available, as well as the application > history (event log file). But they are quite large and can't be attached > here. > ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ > After looking into the log files, we think the failure is caused by the > following two things combined: > * The BlockManagerMasterEndpoint in the driver has some stale block info > corresponding to the dead executor after the worker has been killed. The > driver does appear to handle the "RemoveExecutor" message and cleans up all > related block info. But subsequently, and intermittently, it receives some > Akka messages to re-register the dead BlockManager and re-add some of its > blocks. As a result, upon GetLocations requests from the remaining executor, > the driver responds with some stale block info, instructing the remaining > executor to fetch blocks from the dead executor. Please see the driver log > excerption below that shows the sequence of events described above. In the > log, there are two executors: 1.2.3.4 was the one which got shut down, while > 5.6.7.8 is the remaining executor. The driver also ran on 5.6.7.8. > * When the remaining executor's BlockManager issues a doGetRemote() call to > fetch the block of data, it fails because the targeted BlockManager which > resided in the dead executor is gone. This failure results in an exception > forwarded to the caller, bypassing the mechanism in the doGetRemote() > function to trigger a re-computation of the block. I don't know whether that > is intentional or not. > Driver log excerption that shows that the driver received messages to > re-register the dead executor after handling the RemoveExecutor message: > 11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message > (172.236378 ms) > AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout > -> > http://1.2.3.4:8081/logPage/?appId=app-20150902203512-0000&executorId=0&logType=stdout, > stderr -> > http://1.2.3.4:8081/logPage/?appId=app-20150902203512-0000&executorId=0&logType=stderr)),true) > from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f] > 11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message > AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, > 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true) > from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g] > 11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: > AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, > 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true) > 11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO > BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 > GB RAM, BlockManagerId(0, 1.2.3.4, 52615) > 11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message > (1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, > 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true) > from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g] > ... > 308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR > TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost > ... > 308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler: > Executor lost: 0 (epoch 178) > 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message > AkkaMessage(RemoveExecutor(0),true) from Actor[akka://sparkDriver/temp/$Jqb] > 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: > AkkaMessage(RemoveExecutor(0),true) > 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO > BlockManagerMasterEndpoint: Trying to remove executor 0 from > BlockManagerMaster. > 308906 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO > BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 1.2.3.4, > 52615) > 308907 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message > (2.913003 ms) AkkaMessage(RemoveExecutor(0),true) from > Actor[akka://sparkDriver/temp/$Jqb] > 308908 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO BlockManagerMaster: > Removed 0 successfully in removeExecutor > ... > 308987 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message > AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, > 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true) > from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$rob] > 308987 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: > AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, > 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true) > 308988 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO > BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 > GB RAM, BlockManagerId(0, 1.2.3.4, 52615) > 308988 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message > (0.324347 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, > 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true) > from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$rob] > ... > 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message > AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4, > 52615),rdd_1158_5,StorageLevel(false, true, false, true, 1),686264,0,0),true) > from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$sob] > 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: > AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4, > 52615),rdd_1158_5,StorageLevel(false, true, false, true, 1),686264,0,0),true) > 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] INFO > BlockManagerInfo: Added rdd_1158_5 in memory on 1.2.3.4:52615 (size: 670.2 > KB, free: 6.2 GB) > 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message > (0.327351 ms) AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4, > 52615),rdd_1158_5,StorageLevel(false, true, false, true, 1),686264,0,0),true) > from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$sob] > ... > 332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message > AkkaMessage(GetLocations(rdd_1158_5),true) from > Actor[akka.tcp://sparkExecutor@5.6.7.8:54406/temp/$VCb] > 332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: > AkkaMessage(GetLocations(rdd_1158_5),true) > 332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG > AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message > (0.068605 ms) AkkaMessage(GetLocations(rdd_1158_5),true) from > Actor[akka.tcp://sparkExecutor@5.6.7.8:54406/temp/$VCb] -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org