[ 
https://issues.apache.org/jira/browse/SPARK-10486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14744858#comment-14744858
 ] 

Madhusudanan Kandasamy commented on SPARK-10486:
------------------------------------------------

Can you share a simplified testcase which can help to consistently reproduce it?


> Spark intermittently fails to recover from a worker failure (in standalone 
> mode)
> --------------------------------------------------------------------------------
>
>                 Key: SPARK-10486
>                 URL: https://issues.apache.org/jira/browse/SPARK-10486
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Cheuk Lam
>            Priority: Critical
>
> We have run into a problem where some Spark job is aborted after one worker 
> is killed in a 2-worker standalone cluster.  The problem is intermittent, but 
> we can consistently reproduce it.  The problem only appears to happen when we 
> kill a worker.  It doesn't seem to happen when we kill an executor directly.
> The program we use to reproduce the problem is some iterative program based 
> on GraphX, although the nature of the issue doesn't seem to be GraphX 
> related.  This is how we reproduce the problem:
> * Set up a standalone cluster of 2 workers;
> * Run a Spark application of some iterative program (ours is some based on 
> GraphX);
> * Kill a worker process (and thus the associated executor);
> * Intermittently some job will be aborted.
> The driver and the executor logs are available, as well as the application 
> history (event log file).  But they are quite large and can't be attached 
> here.
> ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
> After looking into the log files, we think the failure is caused by the 
> following two things combined:
> * The BlockManagerMasterEndpoint in the driver has some stale block info 
> corresponding to the dead executor after the worker has been killed.  The 
> driver does appear to handle the "RemoveExecutor" message and cleans up all 
> related block info.  But subsequently, and intermittently, it receives some 
> Akka messages to re-register the dead BlockManager and re-add some of its 
> blocks.  As a result, upon GetLocations requests from the remaining executor, 
> the driver responds with some stale block info, instructing the remaining 
> executor to fetch blocks from the dead executor.  Please see the driver log 
> excerption below that shows the sequence of events described above.  In the 
> log, there are two executors: 1.2.3.4 was the one which got shut down, while 
> 5.6.7.8 is the remaining executor.  The driver also ran on 5.6.7.8.
> * When the remaining executor's BlockManager issues a doGetRemote() call to 
> fetch the block of data, it fails because the targeted BlockManager which 
> resided in the dead executor is gone.  This failure results in an exception 
> forwarded to the caller, bypassing the mechanism in the doGetRemote() 
> function to trigger a re-computation of the block.  I don't know whether that 
> is intentional or not.
> Driver log excerption that shows that the driver received messages to 
> re-register the dead executor after handling the RemoveExecutor message:
> 11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
> (172.236378 ms) 
> AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout
>  -> 
> http://1.2.3.4:8081/logPage/?appId=app-20150902203512-0000&executorId=0&logType=stdout,
>  stderr -> 
> http://1.2.3.4:8081/logPage/?appId=app-20150902203512-0000&executorId=0&logType=stderr)),true)
>  from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f]
> 11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
> AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
> 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true)
>  from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]
> 11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
> AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
> 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true)
> 11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO 
> BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 
> GB RAM, BlockManagerId(0, 1.2.3.4, 52615)
> 11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
> (1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
> 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true)
>  from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g]
> ...
> 308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR 
> TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost
> ...
> 308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler: 
> Executor lost: 0 (epoch 178)
> 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
> AkkaMessage(RemoveExecutor(0),true) from Actor[akka://sparkDriver/temp/$Jqb]
> 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
> AkkaMessage(RemoveExecutor(0),true)
> 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO 
> BlockManagerMasterEndpoint: Trying to remove executor 0 from 
> BlockManagerMaster.
> 308906 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO 
> BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 1.2.3.4, 
> 52615)
> 308907 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
> (2.913003 ms) AkkaMessage(RemoveExecutor(0),true) from 
> Actor[akka://sparkDriver/temp/$Jqb]
> 308908 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO BlockManagerMaster: 
> Removed 0 successfully in removeExecutor
> ...
> 308987 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
> AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
> 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true)
>  from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$rob]
> 308987 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
> AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
> 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true)
> 308988 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO 
> BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 
> GB RAM, BlockManagerId(0, 1.2.3.4, 52615)
> 308988 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
> (0.324347 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 
> 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-216777735])),true)
>  from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$rob]
> ...
> 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
> AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4, 
> 52615),rdd_1158_5,StorageLevel(false, true, false, true, 1),686264,0,0),true) 
> from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$sob]
> 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
> AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4, 
> 52615),rdd_1158_5,StorageLevel(false, true, false, true, 1),686264,0,0),true)
> 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] INFO 
> BlockManagerInfo: Added rdd_1158_5 in memory on 1.2.3.4:52615 (size: 670.2 
> KB, free: 6.2 GB)
> 309003 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
> (0.327351 ms) AkkaMessage(UpdateBlockInfo(BlockManagerId(0, 1.2.3.4, 
> 52615),rdd_1158_5,StorageLevel(false, true, false, true, 1),686264,0,0),true) 
> from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$sob]
> ...
> 332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message 
> AkkaMessage(GetLocations(rdd_1158_5),true) from 
> Actor[akka.tcp://sparkExecutor@5.6.7.8:54406/temp/$VCb]
> 332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: 
> AkkaMessage(GetLocations(rdd_1158_5),true)
> 332822 15/09/02 20:40:37 [sparkDriver-akka.actor.default-dispatcher-4] DEBUG 
> AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message 
> (0.068605 ms) AkkaMessage(GetLocations(rdd_1158_5),true) from 
> Actor[akka.tcp://sparkExecutor@5.6.7.8:54406/temp/$VCb]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to