Hi Richard, I have actually applied the following fix to our 1.4.0 version and this seem to resolve the zombies :)
https://github.com/apache/spark/pull/7077/files Sjoerd 2015-06-26 20:08 GMT+02:00 Richard Marscher <rmarsc...@localytics.com>: > Hi, > > we are on 1.3.1 right now so in case there are differences in the Spark > files I'll walk through the logic of what we did and post a couple gists at > the end. We haven't committed to forking Spark for our own deployments yet, > so right now we shadow some Spark classes in our application code with our > versions of the classes. Keep in mind I am not a Spark committer so the > following is a best effort basis that is working for us. But it may be that > someone more knowledgable about the Spark codebase might see a pitfall to > my solution or a better solution. > > -- > > First, we'll start with the root issue in TaskSchedulerImpl. You will find > the code that prints the "Initial job has not accepted any resources" > warning inside the "submitTasks" function. Spark creates a separate thread > that checks some conditions every "STARVATION_TIMEOUT" milliseconds until > the submitted task set has been launched. It only posts the warn logging > here and does nothing. I will come back to this part of the code in a > moment. > > The code that determines when the "hasLaunchedTask" flag gets set (and > thus closes out the starvation thread and the task set is being worked on > by the cluster) is within the "resourceOffers" function. The various Spark > Scheduler Backend classes will periodically call this function in > TaskSchedulerImpl until cluster resources have been assigned to the task > set. > > To start signaling the zombied scenario, I created a new flag: "@volatile > private var hasZombied = false". In our experience we always get the > resources in resourceOffer before the starvation thread runs, otherwise we > have always hit the zombie scenario if resources weren't allocated yet. So > I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask = > true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) { > dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in > a moment. > > The last detail here is to add code inside the starvation thread block > after it posts the warning log. Set "hasZombied" to true and then call > "this.cancel()" to stop the starvation thread from continuing to run. With > this we now have all the steps needed inside TaskSchedulerImpl to start > signaling out the zombied condition. > > Back to the custom function. DAGScheduler has reference to the appropriate > Spark listeners that can propagate errors to the task set and more > importantly back to your application code. If you look at DAGScheduler > class, you will find a function called "cleanUpAfterSchedulerStop()". This > function does everything we want, except it is hard coded to a specific > exception "val error = new SparkException(...)". What I did was copy this > and made another function that returned a custom Exception I created that I > use to signal the zombie, something like > SparkTaskResourceAllocationZombieError. Now you call this function within > the conditional block in TaskSchedulerImpl.resourceOffers and you should > see your exception propagating out to your application code so you can take > appropriate actions. > > In our case, we are submitting Spark applications programmatically from a > Scala application service on an EC2 instance to a Spark Standalone cluster > in EC2. Whenever we see this error, the application service EC2 instance is > unable to get resources from the cluster even when attempting subsequent > Spark applications for a long period of time (it eventually recovers hours > or days later but that is not useful for us). So in our case we need to > reschedule the failed Spark application on another EC2 application instance > and shut down this current EC2 instance because it can no longer get > cluster resources. Your use case may be different, but at least action can > be taken at an application level. > > Here is some source code, you should be able to locate most of my > additions to the code by searching for comments starting with "// > Localytics Code" > TaskSchedulerImpl gist: > https://gist.github.com/rmarsch/e5d298e582ab75957957 > DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7 > > Regards, > Richard > > On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder <sjoerdmul...@gmail.com> > wrote: > >> Hi Richard, >> >> I would like to see how we can get a workaround to get out of the Zombie >> situation since were planning for production :) >> >> If you could share the workaround or point directions that would be great! >> >> Sjoerd >> >> 2015-06-26 16:53 GMT+02:00 Richard Marscher <rmarsc...@localytics.com>: >> >>> We've seen this issue as well in production. We also aren't sure what >>> causes it, but have just recently shaded some of the Spark code in >>> TaskSchedulerImpl that we use to effectively bubble up an exception from >>> Spark instead of zombie in this situation. If you are interested I can go >>> into more detail about that. Otherwise I'm also keen to find out more on >>> how this might be happening. >>> >>> On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder <sjoerdmul...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I have a really annoying issue that i cannot replicate consistently, >>>> still it happens every +- 100 submissions. (it's a job that's running every >>>> 3 minutes). >>>> Already reported an issue for this: >>>> https://issues.apache.org/jira/browse/SPARK-8592 >>>> >>>> Here are the Thread dump of the Driver and the Executor: >>>> https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ >>>> >>>> Any direction is should look into? >>>> >>>> Spark 1.4.0 >>>> Java 1.8.0_45 (Oracle Corporation) >>>> Scala 2.11.6 >>>> >>>> I already tried to resolve the NPE by not logging the ActorRef. This >>>> makes the NPE go away :) >>>> >>>> But the root cause lies deeper I expect, since then the driver then >>>> still hangs with the "*WARN TaskSchedulerImpl: Initial job has not >>>> accepted any resources; check your cluster UI to ensure that workers are >>>> registered and have sufficient resources*" messages. But there are >>>> enough resources available in the cluster, it has plenty of CPU and Memory >>>> left. >>>> >>>> Logs from Driver: >>>> >>>> 15/06/26 11:58:19 INFO Remoting: Starting remoting >>>> 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on >>>> addresses :[akka.tcp://sparkDriver@172.17.0.123:51415] >>>> 15/06/26 11:58:19 INFO Utils: Successfully started service >>>> 'sparkDriver' on port 51415. >>>> 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker >>>> 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster >>>> 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at >>>> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630 >>>> 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity >>>> 265.1 MB >>>> 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is >>>> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8 >>>> 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server >>>> 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file >>>> server' on port 33176. >>>> 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator >>>> 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on >>>> port 4040. >>>> 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at >>>> http://172.17.0.123:4040 >>>> 15/06/26 11:58:20 INFO SparkContext: Added JAR >>>> file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at >>>> http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with >>>> timestamp 1435319900257 >>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master >>>> akka.tcp://sparkMaster@172.17.42.1:7077/user/Master... >>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark >>>> cluster with app ID app-20150626115820-0917 >>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added: >>>> app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 ( >>>> 10.0.7.171:47050) with 1 cores >>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID >>>> app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores, >>>> 2.0 GB RAM >>>> 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative >>>> execution thread >>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: >>>> app-20150626115820-0917/0 is now LOADING >>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated: >>>> app-20150626115820-0917/0 is now RUNNING >>>> 15/06/26 11:58:20 INFO Utils: Successfully started service >>>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000. >>>> 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on >>>> 52000 >>>> 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register >>>> BlockManager >>>> 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block >>>> manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver, >>>> 172.17.0.123, 52000) >>>> 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager >>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is >>>> ready for scheduling beginning after reached minRegisteredResourcesRatio: >>>> 0.0 >>>> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. >>>> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2. >>>> 15/06/26 11:58:24 INFO SparkContext: Starting job: map at >>>> SparkProductEventAggregator.scala:144 >>>> 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1 >>>> [5cc3f53084] >>>> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from >>>> [675d42c8-9823-4d3c-8e86-5aa611d38770/events] >>>> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping >>>> {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for >>>> [675d42c8-9823-4d3c-8e86-5aa611d38770/events] >>>> 15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at >>>> SparkProductEventAggregator.scala:144) >>>> 15/06/26 11:58:24 INFO DAGScheduler: Got job 0 (map at >>>> SparkProductEventAggregator.scala:144) with 200 output partitions >>>> (allowLocal=false) >>>> 15/06/26 11:58:24 INFO DAGScheduler: Final stage: ResultStage 1(map at >>>> SparkProductEventAggregator.scala:144) >>>> 15/06/26 11:58:24 INFO DAGScheduler: Parents of final stage: >>>> List(ShuffleMapStage 0) >>>> 15/06/26 11:58:24 INFO DAGScheduler: Missing parents: >>>> List(ShuffleMapStage 0) >>>> 15/06/26 11:58:24 INFO DAGScheduler: Submitting ShuffleMapStage 0 >>>> (MapPartitionsRDD[5] at map at SparkProductEventAggregator.scala:144), >>>> which has no missing parents >>>> 15/06/26 11:58:24 INFO MemoryStore: ensureFreeSpace(12384) called with >>>> curMem=0, maxMem=278019440 >>>> 15/06/26 11:58:24 INFO MemoryStore: Block broadcast_0 stored as values >>>> in memory (estimated size 12.1 KB, free 265.1 MB) >>>> 15/06/26 11:58:24 INFO MemoryStore: ensureFreeSpace(5542) called with >>>> curMem=12384, maxMem=278019440 >>>> 15/06/26 11:58:24 INFO MemoryStore: Block broadcast_0_piece0 stored as >>>> bytes in memory (estimated size 5.4 KB, free 265.1 MB) >>>> 15/06/26 11:58:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in >>>> memory on 172.17.0.123:52000 (size: 5.4 KB, free: 265.1 MB) >>>> 15/06/26 11:58:24 INFO SparkContext: Created broadcast 0 from broadcast >>>> at DAGScheduler.scala:874 >>>> 15/06/26 11:58:24 INFO DAGScheduler: Submitting 5 missing tasks from >>>> ShuffleMapStage 0 (MapPartitionsRDD[5] at map at >>>> SparkProductEventAggregator.scala:144) >>>> 15/06/26 11:58:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 5 >>>> tasks >>>> 15/06/26 11:58:39 WARN TaskSchedulerImpl: Initial job has not accepted >>>> any resources; check your cluster UI to ensure that workers are registered >>>> and have sufficient resources >>>> 15/06/26 11:58:54 WARN TaskSchedulerImpl: Initial job has not accepted >>>> any resources; check your cluster UI to ensure that workers are registered >>>> and have sufficient resources >>>> 15/06/26 11:59:09 WARN TaskSchedulerImpl: Initial job has not accepted >>>> any resources; check your cluster UI to ensure that workers are registered >>>> and have sufficient resources >>>> >>>> >>>> Sjoerd >>>> >>>> >>>> >>>> >>> >> >