Hi Richard,

I have actually applied the following fix to our 1.4.0 version and this
seem to resolve the zombies :)

https://github.com/apache/spark/pull/7077/files

Sjoerd

2015-06-26 20:08 GMT+02:00 Richard Marscher <rmarsc...@localytics.com>:

> Hi,
>
> we are on 1.3.1 right now so in case there are differences in the Spark
> files I'll walk through the logic of what we did and post a couple gists at
> the end. We haven't committed to forking Spark for our own deployments yet,
> so right now we shadow some Spark classes in our application code with our
> versions of the classes. Keep in mind I am not a Spark committer so the
> following is a best effort basis that is working for us. But it may be that
> someone more knowledgable about the Spark codebase might see a pitfall to
> my solution or a better solution.
>
> --
>
> First, we'll start with the root issue in TaskSchedulerImpl. You will find
> the code that prints the "Initial job has not accepted any resources"
> warning inside the "submitTasks" function. Spark creates a separate thread
> that checks some conditions every "STARVATION_TIMEOUT" milliseconds until
> the submitted task set has been launched. It only posts the warn logging
> here and does nothing. I will come back to this part of the code in a
> moment.
>
> The code that determines when the "hasLaunchedTask" flag gets set (and
> thus closes out the starvation thread and the task set is being worked on
> by the cluster) is within the "resourceOffers" function. The various Spark
> Scheduler Backend classes will periodically call this function in
> TaskSchedulerImpl until cluster resources have been assigned to the task
> set.
>
> To start signaling the zombied scenario, I created a new flag: "@volatile
> private var hasZombied = false". In our experience we always get the
> resources in resourceOffer before the starvation thread runs, otherwise we
> have always hit the zombie scenario if resources weren't allocated yet. So
> I added a conditional before the "if(tasks.size > 0) { hasLaunchedTask =
> true }" block. The conditional checks "if(!hasLaunchedTask && hasZombied) {
> dagScheduler.ourCustomFunction() }". I'll explain that DAGScheduler call in
> a moment.
>
> The last detail here is to add code inside the starvation thread block
> after it posts the warning log. Set "hasZombied" to true and then call
> "this.cancel()" to stop the starvation thread from continuing to run. With
> this we now have all the steps needed inside TaskSchedulerImpl to start
> signaling out the zombied condition.
>
> Back to the custom function. DAGScheduler has reference to the appropriate
> Spark listeners that can propagate errors to the task set and more
> importantly back to your application code. If you look at DAGScheduler
> class, you will find a function called "cleanUpAfterSchedulerStop()". This
> function does everything we want, except it is hard coded to a specific
> exception "val error = new SparkException(...)". What I did was copy this
> and made another function that returned a custom Exception I created that I
> use to signal the zombie, something like
> SparkTaskResourceAllocationZombieError. Now you call this function within
> the conditional block in TaskSchedulerImpl.resourceOffers and you should
> see your exception propagating out to your application code so you can take
> appropriate actions.
>
> In our case, we are submitting Spark applications programmatically from a
> Scala application service on an EC2 instance to a Spark Standalone cluster
> in EC2. Whenever we see this error, the application service EC2 instance is
> unable to get resources from the cluster even when attempting subsequent
> Spark applications for a long period of time (it eventually recovers hours
> or days later but that is not useful for us). So in our case we need to
> reschedule the failed Spark application on another EC2 application instance
> and shut down this current EC2 instance because it can no longer get
> cluster resources. Your use case may be different, but at least action can
> be taken at an application level.
>
> Here is some source code, you should be able to locate most of my
> additions to the code by searching for comments starting with "//
> Localytics Code"
> TaskSchedulerImpl gist:
> https://gist.github.com/rmarsch/e5d298e582ab75957957
> DAGScheduler gist: https://gist.github.com/rmarsch/ae8f5bb03b11e8d4f8f7
>
> Regards,
> Richard
>
> On Fri, Jun 26, 2015 at 12:08 PM, Sjoerd Mulder <sjoerdmul...@gmail.com>
> wrote:
>
>> Hi Richard,
>>
>> I would  like to see how we can get a workaround to get out of the Zombie
>> situation since were planning for production :)
>>
>> If you could share the workaround or point directions that would be great!
>>
>> Sjoerd
>>
>> 2015-06-26 16:53 GMT+02:00 Richard Marscher <rmarsc...@localytics.com>:
>>
>>> We've seen this issue as well in production. We also aren't sure what
>>> causes it, but have just recently shaded some of the Spark code in
>>> TaskSchedulerImpl that we use to effectively bubble up an exception from
>>> Spark instead of zombie in this situation. If you are interested I can go
>>> into more detail about that. Otherwise I'm also keen to find out more on
>>> how this might be happening.
>>>
>>> On Fri, Jun 26, 2015 at 8:28 AM, Sjoerd Mulder <sjoerdmul...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a really annoying issue that i cannot replicate consistently,
>>>> still it happens every +- 100 submissions. (it's a job that's running every
>>>> 3 minutes).
>>>> Already reported an issue for this:
>>>> https://issues.apache.org/jira/browse/SPARK-8592
>>>>
>>>> Here are the Thread dump of the Driver and the Executor:
>>>> https://docs.google.com/document/d/1x7ZwUzlvRqeJQ12FoGhpLV1zqDAmVsaF2HYhzkPNBKQ
>>>>
>>>> Any direction is should look into?
>>>>
>>>> Spark 1.4.0
>>>> Java 1.8.0_45 (Oracle Corporation)
>>>> Scala 2.11.6
>>>>
>>>> I already tried to resolve the NPE by not logging the ActorRef. This
>>>> makes the NPE go away :)
>>>>
>>>> But  the root cause lies deeper I expect, since then the driver then
>>>> still hangs with the "*WARN TaskSchedulerImpl: Initial job has not
>>>> accepted any resources; check your cluster UI to ensure that workers are
>>>> registered and have sufficient resources*" messages. But there are
>>>> enough resources available in the cluster, it has plenty of CPU and Memory
>>>> left.
>>>>
>>>> Logs from Driver:
>>>>
>>>> 15/06/26 11:58:19 INFO Remoting: Starting remoting
>>>> 15/06/26 11:58:19 INFO Remoting: Remoting started; listening on
>>>> addresses :[akka.tcp://sparkDriver@172.17.0.123:51415]
>>>> 15/06/26 11:58:19 INFO Utils: Successfully started service
>>>> 'sparkDriver' on port 51415.
>>>> 15/06/26 11:58:20 INFO SparkEnv: Registering MapOutputTracker
>>>> 15/06/26 11:58:20 INFO SparkEnv: Registering BlockManagerMaster
>>>> 15/06/26 11:58:20 INFO DiskBlockManager: Created local directory at
>>>> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/blockmgr-92b1e974-53bb-45a3-b918-916759e14630
>>>> 15/06/26 11:58:20 INFO MemoryStore: MemoryStore started with capacity
>>>> 265.1 MB
>>>> 15/06/26 11:58:20 INFO HttpFileServer: HTTP File server directory is
>>>> /tmp/spark-ff1f5a88-4e1d-4fe0-9c54-890e4174f02a/httpd-f5894293-33aa-4eaa-9740-4a36c054b6c8
>>>> 15/06/26 11:58:20 INFO HttpServer: Starting HTTP Server
>>>> 15/06/26 11:58:20 INFO Utils: Successfully started service 'HTTP file
>>>> server' on port 33176.
>>>> 15/06/26 11:58:20 INFO SparkEnv: Registering OutputCommitCoordinator
>>>> 15/06/26 11:58:20 INFO Utils: Successfully started service 'SparkUI' on
>>>> port 4040.
>>>> 15/06/26 11:58:20 INFO SparkUI: Started SparkUI at
>>>> http://172.17.0.123:4040
>>>> 15/06/26 11:58:20 INFO SparkContext: Added JAR
>>>> file:/opt/jar/spark/spark-job-1.0-SNAPSHOT.jar at
>>>> http://172.17.0.123:33176/jars/spark-job-1.0-SNAPSHOT.jar with
>>>> timestamp 1435319900257
>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Connecting to master
>>>> akka.tcp://sparkMaster@172.17.42.1:7077/user/Master...
>>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Connected to Spark
>>>> cluster with app ID app-20150626115820-0917
>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor added:
>>>> app-20150626115820-0917/0 on worker-20150625133752-10.0.7.171-47050 (
>>>> 10.0.7.171:47050) with 1 cores
>>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: Granted executor ID
>>>> app-20150626115820-0917/0 on hostPort 10.0.7.171:47050 with 1 cores,
>>>> 2.0 GB RAM
>>>> 15/06/26 11:58:20 INFO TaskSchedulerImpl: Starting speculative
>>>> execution thread
>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
>>>> app-20150626115820-0917/0 is now LOADING
>>>> 15/06/26 11:58:20 INFO AppClient$ClientActor: Executor updated:
>>>> app-20150626115820-0917/0 is now RUNNING
>>>> 15/06/26 11:58:20 INFO Utils: Successfully started service
>>>> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 52000.
>>>> 15/06/26 11:58:20 INFO NettyBlockTransferService: Server created on
>>>> 52000
>>>> 15/06/26 11:58:20 INFO BlockManagerMaster: Trying to register
>>>> BlockManager
>>>> 15/06/26 11:58:20 INFO BlockManagerMasterEndpoint: Registering block
>>>> manager 172.17.0.123:52000 with 265.1 MB RAM, BlockManagerId(driver,
>>>> 172.17.0.123, 52000)
>>>> 15/06/26 11:58:20 INFO BlockManagerMaster: Registered BlockManager
>>>> 15/06/26 11:58:20 INFO SparkDeploySchedulerBackend: SchedulerBackend is
>>>> ready for scheduling beginning after reached minRegisteredResourcesRatio:
>>>> 0.0
>>>> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
>>>> 15/06/26 11:58:24 INFO Exchange: Using SparkSqlSerializer2.
>>>> 15/06/26 11:58:24 INFO SparkContext: Starting job: map at
>>>> SparkProductEventAggregator.scala:144
>>>> 15/06/26 11:58:24 INFO Version: Elasticsearch Hadoop v2.1.0.rc1
>>>> [5cc3f53084]
>>>> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Reading from
>>>> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
>>>> 15/06/26 11:58:24 INFO ScalaEsRowRDD: Discovered mapping
>>>> {675d42c8-9823-4d3c-8e86-5aa611d38770=[REMOVED]} for
>>>> [675d42c8-9823-4d3c-8e86-5aa611d38770/events]
>>>> 15/06/26 11:58:24 INFO DAGScheduler: Registering RDD 5 (map at
>>>> SparkProductEventAggregator.scala:144)
>>>> 15/06/26 11:58:24 INFO DAGScheduler: Got job 0 (map at
>>>> SparkProductEventAggregator.scala:144) with 200 output partitions
>>>> (allowLocal=false)
>>>> 15/06/26 11:58:24 INFO DAGScheduler: Final stage: ResultStage 1(map at
>>>> SparkProductEventAggregator.scala:144)
>>>> 15/06/26 11:58:24 INFO DAGScheduler: Parents of final stage:
>>>> List(ShuffleMapStage 0)
>>>> 15/06/26 11:58:24 INFO DAGScheduler: Missing parents:
>>>> List(ShuffleMapStage 0)
>>>> 15/06/26 11:58:24 INFO DAGScheduler: Submitting ShuffleMapStage 0
>>>> (MapPartitionsRDD[5] at map at SparkProductEventAggregator.scala:144),
>>>> which has no missing parents
>>>> 15/06/26 11:58:24 INFO MemoryStore: ensureFreeSpace(12384) called with
>>>> curMem=0, maxMem=278019440
>>>> 15/06/26 11:58:24 INFO MemoryStore: Block broadcast_0 stored as values
>>>> in memory (estimated size 12.1 KB, free 265.1 MB)
>>>> 15/06/26 11:58:24 INFO MemoryStore: ensureFreeSpace(5542) called with
>>>> curMem=12384, maxMem=278019440
>>>> 15/06/26 11:58:24 INFO MemoryStore: Block broadcast_0_piece0 stored as
>>>> bytes in memory (estimated size 5.4 KB, free 265.1 MB)
>>>> 15/06/26 11:58:24 INFO BlockManagerInfo: Added broadcast_0_piece0 in
>>>> memory on 172.17.0.123:52000 (size: 5.4 KB, free: 265.1 MB)
>>>> 15/06/26 11:58:24 INFO SparkContext: Created broadcast 0 from broadcast
>>>> at DAGScheduler.scala:874
>>>> 15/06/26 11:58:24 INFO DAGScheduler: Submitting 5 missing tasks from
>>>> ShuffleMapStage 0 (MapPartitionsRDD[5] at map at
>>>> SparkProductEventAggregator.scala:144)
>>>> 15/06/26 11:58:24 INFO TaskSchedulerImpl: Adding task set 0.0 with 5
>>>> tasks
>>>> 15/06/26 11:58:39 WARN TaskSchedulerImpl: Initial job has not accepted
>>>> any resources; check your cluster UI to ensure that workers are registered
>>>> and have sufficient resources
>>>> 15/06/26 11:58:54 WARN TaskSchedulerImpl: Initial job has not accepted
>>>> any resources; check your cluster UI to ensure that workers are registered
>>>> and have sufficient resources
>>>> 15/06/26 11:59:09 WARN TaskSchedulerImpl: Initial job has not accepted
>>>> any resources; check your cluster UI to ensure that workers are registered
>>>> and have sufficient resources
>>>>
>>>>
>>>> Sjoerd
>>>>
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to