I don’t think we necessarily want to do this through the DAGScheduler because 
the worker might also shut down due to some unusual termination condition, like 
the driver node crashing. Can’t we do it at the top of the shutdown hook 
instead? If all the threads are in the same thread pool it might be possible to 
interrupt or stop the whole pool.

Matei

On Feb 6, 2014, at 11:30 PM, Andrew Ash <and...@andrewash.com> wrote:

> That's genius.  Of course when a worker is told to shutdown it should
> interrupt its worker threads -- I think that would address this issue.
> 
> Are you thinking to put
> 
> running.map(_.jobId).foreach { handleJobCancellation }
> 
> at the top of the StopDAGScheduler block?
> 
> 
> On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das
> <tathagata.das1...@gmail.com>wrote:
> 
>> Its highly likely that the executor with the threadpool that runs the tasks
>> are the only set of threads that writes to disk. The tasks are designed to
>> be interrupted when the corresponding job is cancelled. So a reasonably
>> simple way could be to actually cancel the currently active jobs, which
>> would send the signal to the worker to stop the tasks. Currently, the
>> DAGScheduler<
>> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610
>>> does
>> not seem to actually cancel the jobs, only mark them as failed. So it
>> may be a simple addition.
>> 
>> There may be some complications with the external spilling of shuffle data
>> to disk not stopping immediately when the task is marked for killing. Gotta
>> try it out.
>> 
>> TD
>> 
>> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <and...@andrewash.com> wrote:
>> 
>>> There is probably just one threadpool that has task threads -- is it
>>> possible to enumerate and interrupt just those?  We may need to keep
>> string
>>> a reference to that threadpool through to the shutdown thread to make
>> that
>>> happen.
>>> 
>>> 
>>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <mri...@gmail.com
>>>> wrote:
>>> 
>>>> Ideally, interrupting the thread writing to disk should be sufficient
>>>> - though since we are in middle of shutdown when this is happening, it
>>>> is best case effort anyway.
>>>> Identifying which threads to interrupt will be interesting since most
>>>> of them are driven by threadpool's and we cant list all threads and
>>>> interrupt all of them !
>>>> 
>>>> 
>>>> Regards,
>>>> Mridul
>>>> 
>>>> 
>>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <and...@andrewash.com>
>> wrote:
>>>>> I think the solution where we stop the writing threads and then let
>> the
>>>>> deleting threads completely clean up is the best option since the
>> final
>>>>> state doesn't have half-deleted temp dirs scattered across the
>> cluster.
>>>>> 
>>>>> How feasible do you think it'd be to interrupt the other threads?
>>>>> 
>>>>> 
>>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <
>> mri...@gmail.com
>>>>> wrote:
>>>>> 
>>>>>> Looks like a pathological corner case here - where the the delete
>>>>>> thread is not getting run while the OS is busy prioritizing the
>> thread
>>>>>> writing data (probably with heavy gc too).
>>>>>> Ideally, the delete thread would list files, remove them and then
>> fail
>>>>>> when it tries to remove the non empty directory (since other thread
>>>>>> might be creating more in parallel).
>>>>>> 
>>>>>> 
>>>>>> Regards,
>>>>>> Mridul
>>>>>> 
>>>>>> 
>>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <and...@andrewash.com>
>>>> wrote:
>>>>>>> Got a repro locally on my MBP (the other was on a CentOS machine).
>>>>>>> 
>>>>>>> Build spark, run a master and a worker with the sbin/start-all.sh
>>>> script,
>>>>>>> then run this in a shell:
>>>>>>> 
>>>>>>> import org.apache.spark.storage.StorageLevel._
>>>>>>> val s = sc.parallelize(1 to
>>> 1000000000).persist(MEMORY_AND_DISK_SER);
>>>>>>> s.count
>>>>>>> 
>>>>>>> After about a minute, this line appears in the shell logging
>> output:
>>>>>>> 
>>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing
>>> BlockManager
>>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no
>> recent
>>>>>> heart
>>>>>>> beats: 57510ms exceeds 45000ms
>>>>>>> 
>>>>>>> Ctrl-C the shell.  In jps there is now a worker, a master, and a
>>>>>>> CoarseGrainedExecutorBackend.
>>>>>>> 
>>>>>>> Run jstack on the CGEBackend JVM, and I got the attached
>>> stacktraces.
>>>> I
>>>>>>> waited around for 15min then kill -9'd the JVM and restarted the
>>>> process.
>>>>>>> 
>>>>>>> I wonder if what's happening here is that the threads that are
>>> spewing
>>>>>> data
>>>>>>> to disk (as that parallelize and persist would do) can write to
>> disk
>>>>>> faster
>>>>>>> than the cleanup threads can delete from disk.
>>>>>>> 
>>>>>>> What do you think of that theory?
>>>>>>> 
>>>>>>> 
>>>>>>> Andrew
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <
>>> mri...@gmail.com
>>>>> 
>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> shutdown hooks should not take 15 mins are you mentioned !
>>>>>>>> On the other hand, how busy was your disk when this was
>> happening ?
>>>>>>>> (either due to spark or something else ?)
>>>>>>>> 
>>>>>>>> It might just be that there was a lot of stuff to remove ?
>>>>>>>> 
>>>>>>>> Regards,
>>>>>>>> Mridul
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <and...@andrewash.com
>>> 
>>>>>> wrote:
>>>>>>>>> Hi Spark devs,
>>>>>>>>> 
>>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on
>>> 0.9.0
>>>> one
>>>>>>>>> of
>>>>>>>>> my workers goes dead in the spark master UI.  I'm using the
>>>> standalone
>>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think
>> it
>>>> may
>>>>>> be
>>>>>>>>> a
>>>>>>>>> regression.
>>>>>>>>> 
>>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with
>>>> jstack
>>>>>> and
>>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force flag.
>>> The
>>>>>> heap
>>>>>>>>> isn't full, but there are some interesting bits in the jstack.
>>>> Poking
>>>>>>>>> around a little, I think there may be some kind of deadlock in
>>> the
>>>>>>>>> shutdown
>>>>>>>>> hooks.
>>>>>>>>> 
>>>>>>>>> Below are the threads I think are most interesting:
>>>>>>>>> 
>>>>>>>>> Thread 14308: (state = BLOCKED)
>>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted
>>> frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object,
>>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame)
>>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object)
>> @bci=25,
>>>>>>>>> line=498
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39,
>>>>>> line=456
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24,
>>>> line=237
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted
>>>> frame)
>>>>>>>>> -
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec()
>>>>>>>>> @bci=4, line=386 (Interpreted frame)
>>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10,
>>>> line=260
>>>>>>>>> (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask)
>>>>>>>>> @bci=10, line=1339 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue)
>>>>>>>>> @bci=11, line=1979 (Compiled frame)
>>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run()
>> @bci=14,
>>>>>>>>> line=107
>>>>>>>>> (Interpreted frame)
>>>>>>>>> 
>>>>>>>>> Thread 3865: (state = BLOCKED)
>>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame)
>>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted
>> frame)
>>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87,
>>> line=106
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46
>>>>>>>>> (Interpreted
>>>>>>>>> frame)
>>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted
>>>> frame)
>>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8,
>> line=52
>>>>>>>>> (Interpreted frame)
>>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame)
>>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> Thread 3987: (state = BLOCKED)
>>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0
>> (Interpreted
>>>>>> frame)
>>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame)
>>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame)
>>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File)
>>>> @bci=1,
>>>>>>>>> line=466 (Interpreted frame)
>>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>>>>>> @bci=9,
>>>>>>>>> line=478 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File)
>>>>>>>>> @bci=4, line=479 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object)
>>>>>>>>> @bci=5, line=478 (Compiled frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>>>> -
>> scala.collection.mutable.WrappedArray.foreach(scala.Function1)
>>>>>>>>> @bci=2,
>>>>>>>>> line=34 (Compiled frame)
>>>>>>>>> - org.apache.spark.util.Utils$.deleteRecursively(java.io.File)
>>>>>> @bci=19,
>>>>>>>>> line=478 (Interpreted frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File)
>>>>>>>>> @bci=14, line=141 (Interpreted frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object)
>>>>>>>>> @bci=5, line=139 (Interpreted frame)
>>>>>>>>> -
>>>>>>>>> 
>>>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>>>>>> -
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>>>>>>>>> @bci=2,
>>>>>>>>> line=108 (Interpreted frame)
>>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run()
>>> @bci=39,
>>>>>>>>> line=139 (Interpreted frame)
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> I think what happened here is that thread 14308 received the
>> akka
>>>>>>>>> "shutdown" message and called System.exit().  This started
>> thread
>>>>>> 3865,
>>>>>>>>> which is the JVM shutting itself down.  Part of that process is
>>>>>> running
>>>>>>>>> the
>>>>>>>>> shutdown hooks, so it started thread 3987.  That thread is the
>>>>>> shutdown
>>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which
>>> looks
>>>>>> like
>>>>>>>>> this:
>>>>>>>>> 
>>>>>>>>>  private def addShutdownHook() {
>>>>>>>>>    localDirs.foreach(localDir =>
>>>>>>>>> Utils.registerShutdownDeleteDir(localDir))
>>>>>>>>>    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark
>>>> local
>>>>>>>>> dirs") {
>>>>>>>>>      override def run() {
>>>>>>>>>        logDebug("Shutdown hook called")
>>>>>>>>>        localDirs.foreach { localDir =>
>>>>>>>>>          try {
>>>>>>>>>            if (!Utils.hasRootAsShutdownDeleteDir(localDir))
>>>>>>>>> Utils.deleteRecursively(localDir)
>>>>>>>>>          } catch {
>>>>>>>>>            case t: Throwable =>
>>>>>>>>>              logError("Exception while deleting local spark
>> dir:
>>>> " +
>>>>>>>>> localDir, t)
>>>>>>>>>          }
>>>>>>>>>        }
>>>>>>>>> 
>>>>>>>>>        if (shuffleSender != null) {
>>>>>>>>>          shuffleSender.stop()
>>>>>>>>>        }
>>>>>>>>>      }
>>>>>>>>>    })
>>>>>>>>>  }
>>>>>>>>> 
>>>>>>>>> It goes through and deletes the directories recursively.  I was
>>>>>> thinking
>>>>>>>>> there might be some issues with concurrently-running shutdown
>>> hooks
>>>>>>>>> deleting things out from underneath each other (shutdown hook
>>>> javadocs
>>>>>>>>> say
>>>>>>>>> they're all started in parallel if multiple hooks are added)
>>>> causing
>>>>>> the
>>>>>>>>> File.list() in that last thread to take quite some time.
>>>>>>>>> 
>>>>>>>>> While I was looking through the stacktrace the JVM finally
>> exited
>>>>>> (after
>>>>>>>>> 15-20min at least) so I won't be able to debug more until this
>>> bug
>>>>>>>>> strikes
>>>>>>>>> again.
>>>>>>>>> 
>>>>>>>>> Any ideas on what might be going on here?
>>>>>>>>> 
>>>>>>>>> Thanks!
>>>>>>>>> Andrew
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>> 
>>> 
>> 

Reply via email to