Spark intermittently fails to recover from a worker failure (in standalone mode)
We have run into a problem where some Spark job is aborted after a worker is killed in a 2-worker standalone cluster. The problem is intermittent, but we can consistently reproduce it. The problem only appears to happen when we kill a worker. It doesn't happen when we kill an executor directly. Has anyone run into a similar problem? We would appreciate if anyone could share some related experience and/or any suggestion to troubleshoot this problem. Thank you. ~ For those who are interested, we did look into the logs and this is our analysis so far. We think the failure is caused by the following two things combined, but we don't know how the first thing could happen. * The BlockManagerMasterEndpoint in the driver has some stale block info corresponding to the dead executor after the worker has been killed. The driver does appear to handle the "RemoveExecutor" message and cleans up all related block info. But subsequently, and intermittently, it receives some Akka messages to re-register the dead BlockManager and re-add some of its blocks. As a result, upon GetLocations requests from the remaining executor, the driver responds with some stale block info, instructing the remaining executor to fetch blocks from the dead executor. Please see the driver log excerption below that shows the sequence of events described above. In the log, there are two executors: 1.2.3.4 was the one which got shut down, while 5.6.7.8 is the remaining executor. The driver also ran on 5.6.7.8. * When the remaining executor's BlockManager issues a doGetRemote() call to fetch the block of data, it fails because the targeted BlockManager which resided in the dead executor is gone. This failure results in an exception forwarded to the caller, bypassing the mechanism in the doGetRemote() function to trigger a re-computation of the block. I don't know whether that is intentional or not. Driver log excerption that shows the driver received messages to re-register the dead BlockManager after handling the RemoveExecutor message: 11690 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message (172.236378 ms) AkkaMessage(RegisterExecutor(0,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/Executor#670388190]),1.2.3.4:36140,8,Map(stdout -> http://1.2.3.4:8081/logPage/?appId=app-20150902203512-=0=stdout, stderr -> http://1.2.3.4:8081/logPage/?appId=app-20150902203512-=0=stderr)),true) from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$f] 11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true) from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g] 11717 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true) 11718 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] INFO BlockManagerMasterEndpoint: Registering block manager 1.2.3.4:52615 with 6.2 GB RAM, BlockManagerId(0, 1.2.3.4, 52615) 11719 15/09/02 20:35:16 [sparkDriver-akka.actor.default-dispatcher-15] DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] handled message (1.498313 ms) AkkaMessage(RegisterBlockManager(BlockManagerId(0, 1.2.3.4, 52615),6667936727,AkkaRpcEndpointRef(Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/user/BlockManagerEndpoint1#-21635])),true) from Actor[akka.tcp://sparkExecutor@1.2.3.4:36140/temp/$g] ... 308892 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] ERROR TaskSchedulerImpl: Lost executor 0 on 1.2.3.4: worker lost ... 308903 15/09/02 20:40:13 [dag-scheduler-event-loop] INFO DAGScheduler: Executor lost: 0 (epoch 178) 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: [actor] received message AkkaMessage(RemoveExecutor(0),true) from Actor[akka://sparkDriver/temp/$Jqb] 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] DEBUG AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1: Received RPC message: AkkaMessage(RemoveExecutor(0),true) 308904 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO BlockManagerMasterEndpoint: Trying to remove executor 0 from BlockManagerMaster. 308906 15/09/02 20:40:13 [sparkDriver-akka.actor.default-dispatcher-3] INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(0, 1.2.3.4, 52615) 308907 15/09/02 20:40:13
Re: in GraphX,program with Pregel runs slower and slower after several iterations
I think you're exactly right. I once had 100 iterations in a single Pregel call, and got into the lineage problem right there. I had to modify the Pregel function and checkpoint both the graph and the newVerts RDD there to cut off the lineage. If you draw out the dependency graph among the g, the newVerts RDD and the messages RDD inside the Pregel loop, then you will find out we need to checkpoint two things to really cut off the lineage: the graph itself and one of newVerts or messages. This is how I did it inside the Pregel loop: ... prevG = g g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) = newOpt.getOrElse(old) } g.cache() if (i % 50 == 0) { g.checkpoint newVerts.checkpoint } ... Also note: checkpointing is only effective before the RDD is materialized. If you checkpoint outside of Pregel, which means the graph is already materialized (by the mapReduceTriplets call), then nothing will happen. You can examine that by looking at the RDD.toDebugString. Therefore, I had to apply the following workaround: val clonedGraph = graph.mapVertices((vid, vd) = vd).mapEdges{edge = edge.attr} clonedGraph.checkpoint graph = clonedGraph -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/in-GraphX-program-with-Pregel-runs-slower-and-slower-after-several-iterations-tp23121p23133.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: in GraphX,program with Pregel runs slower and slower after several iterations
I've been encountering something similar too. I suspected that was related to the lineage growth of the graph/RDDs. So I checkpoint the graph every 60 Pregel rounds, after doing which my program doesn't slow down any more (except that every checkpoint takes some extra time). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/in-GraphX-program-with-Pregel-runs-slower-and-slower-after-several-iterations-tp23121p23122.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to delete graph checkpoints?
This is a question about checkpointing on GraphX. We'd like to automate deleting checkpoint files of old graphs. The RDD class has a getCheckpointFile() function, which allows us to retrieve the checkpoint file of an old RDD and then delete it. However, I couldn't find a way to get hold of the corresponding checkpointed RDDs given the graph reference; because the checkpoint of a GraphImpl is really done to the underlying partitionsRDD in both VertexRDD and EdgeRDD, and that partitionsRDD as defined today doesn't seem to be accessible from outside of graphx. Below is the declaration in VertexRDD.scala: private[graphx] def partitionsRDD: RDD[ShippableVertexPartition[VD]] We would really appreciate it if anyone could shed some light on solving this problem, or anyone who has come across a similar problem could share a solution or workaround. Thank you, Cheuk Lam -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-delete-graph-checkpoints-tp21296.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark in cluster and errors
I wasn't the original person who posted the question, but this helped me! :) Thank you. I had a similar issue today when I tried to connect using the IP address (spark://master_ip:7077). I got it resolved by replacing it with the URL displayed in the Spark web console - in my case it is (spark://master_short_hostname:7077). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-in-cluster-and-errors-tp16249p16696.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Performance with activeSetOpt in GraphImpl.mapReduceTriplets()
When using the activeSetOpt in GraphImpl.mapReduceTriplets(), can we expect a performance that is only proportional to the size of the active set and independent of the size of the original data set? Or there is still a fixed overhead that depends on the size of the original data set? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-with-activeSetOpt-in-GraphImpl-mapReduceTriplets-tp16050.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Pregel messages serialized in local machine?
This is a question on using the Pregel function in GraphX. Does a message get serialized and then de-serialized in the scenario where both the source and the destination vertices are in the same compute node/machine? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Pregel-messages-serialized-in-local-machine-tp15140.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org