An additional shutdown hook to stop the threadpool is much more elegant than the name matching and thread interrupting I was thinking about. That Javadoc looks like it's a best-effort shutdown and won't hard kill threads, but that's at least a step forward from current behavior.
http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ThreadPoolExecutor.html#shutdownNow() On Fri, Feb 7, 2014 at 12:13 AM, Tathagata Das <[email protected]>wrote: > Or we can try adding a shutdown hook in the > Executor< > https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127 > >to > call threadPool.shutdownNow(). May have to catch the > InterruptedException and handle it gracefully out > here< > https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255 > > > . > > TD > > > On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <[email protected]> wrote: > > > I think we can enumerate all current threads with the ThreadMXBean, > filter > > to those threads with the name of executor pool in them, and interrupt > > them. > > > > > > > http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29 > > > > The executor threads are currently named according to the pattern > "Executor > > task launch worker-X" > > > > > > On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das > > <[email protected]>wrote: > > > > > That definitely sound more reliable. Worth trying out if there is a > > > reliable way of reproducing the deadlock-like scenario. > > > > > > TD > > > > > > > > > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia < > [email protected] > > > >wrote: > > > > > > > I don't think we necessarily want to do this through the DAGScheduler > > > > because the worker might also shut down due to some unusual > termination > > > > condition, like the driver node crashing. Can't we do it at the top > of > > > the > > > > shutdown hook instead? If all the threads are in the same thread pool > > it > > > > might be possible to interrupt or stop the whole pool. > > > > > > > > Matei > > > > > > > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <[email protected]> > wrote: > > > > > > > > > That's genius. Of course when a worker is told to shutdown it > should > > > > > interrupt its worker threads -- I think that would address this > > issue. > > > > > > > > > > Are you thinking to put > > > > > > > > > > running.map(_.jobId).foreach { handleJobCancellation } > > > > > > > > > > at the top of the StopDAGScheduler block? > > > > > > > > > > > > > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das > > > > > <[email protected]>wrote: > > > > > > > > > >> Its highly likely that the executor with the threadpool that runs > > the > > > > tasks > > > > >> are the only set of threads that writes to disk. The tasks are > > > designed > > > > to > > > > >> be interrupted when the corresponding job is cancelled. So a > > > reasonably > > > > >> simple way could be to actually cancel the currently active jobs, > > > which > > > > >> would send the signal to the worker to stop the tasks. Currently, > > the > > > > >> DAGScheduler< > > > > >> > > > > > > > > > > https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610 > > > > >>> does > > > > >> not seem to actually cancel the jobs, only mark them as failed. So > > it > > > > >> may be a simple addition. > > > > >> > > > > >> There may be some complications with the external spilling of > > shuffle > > > > data > > > > >> to disk not stopping immediately when the task is marked for > > killing. > > > > Gotta > > > > >> try it out. > > > > >> > > > > >> TD > > > > >> > > > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <[email protected] > > > > > > wrote: > > > > >> > > > > >>> There is probably just one threadpool that has task threads -- is > > it > > > > >>> possible to enumerate and interrupt just those? We may need to > > keep > > > > >> string > > > > >>> a reference to that threadpool through to the shutdown thread to > > make > > > > >> that > > > > >>> happen. > > > > >>> > > > > >>> > > > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan < > > > [email protected] > > > > >>>> wrote: > > > > >>> > > > > >>>> Ideally, interrupting the thread writing to disk should be > > > sufficient > > > > >>>> - though since we are in middle of shutdown when this is > > happening, > > > it > > > > >>>> is best case effort anyway. > > > > >>>> Identifying which threads to interrupt will be interesting since > > > most > > > > >>>> of them are driven by threadpool's and we cant list all threads > > and > > > > >>>> interrupt all of them ! > > > > >>>> > > > > >>>> > > > > >>>> Regards, > > > > >>>> Mridul > > > > >>>> > > > > >>>> > > > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash < > [email protected]> > > > > >> wrote: > > > > >>>>> I think the solution where we stop the writing threads and then > > let > > > > >> the > > > > >>>>> deleting threads completely clean up is the best option since > the > > > > >> final > > > > >>>>> state doesn't have half-deleted temp dirs scattered across the > > > > >> cluster. > > > > >>>>> > > > > >>>>> How feasible do you think it'd be to interrupt the other > threads? > > > > >>>>> > > > > >>>>> > > > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan < > > > > >> [email protected] > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> Looks like a pathological corner case here - where the the > > delete > > > > >>>>>> thread is not getting run while the OS is busy prioritizing > the > > > > >> thread > > > > >>>>>> writing data (probably with heavy gc too). > > > > >>>>>> Ideally, the delete thread would list files, remove them and > > then > > > > >> fail > > > > >>>>>> when it tries to remove the non empty directory (since other > > > thread > > > > >>>>>> might be creating more in parallel). > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Regards, > > > > >>>>>> Mridul > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash < > > [email protected]> > > > > >>>> wrote: > > > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS > > > machine). > > > > >>>>>>> > > > > >>>>>>> Build spark, run a master and a worker with the > > sbin/start-all.sh > > > > >>>> script, > > > > >>>>>>> then run this in a shell: > > > > >>>>>>> > > > > >>>>>>> import org.apache.spark.storage.StorageLevel._ > > > > >>>>>>> val s = sc.parallelize(1 to > > > > >>> 1000000000).persist(MEMORY_AND_DISK_SER); > > > > >>>>>>> s.count > > > > >>>>>>> > > > > >>>>>>> After about a minute, this line appears in the shell logging > > > > >> output: > > > > >>>>>>> > > > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing > > > > >>> BlockManager > > > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no > > > > >> recent > > > > >>>>>> heart > > > > >>>>>>> beats: 57510ms exceeds 45000ms > > > > >>>>>>> > > > > >>>>>>> Ctrl-C the shell. In jps there is now a worker, a master, > and > > a > > > > >>>>>>> CoarseGrainedExecutorBackend. > > > > >>>>>>> > > > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached > > > > >>> stacktraces. > > > > >>>> I > > > > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted > > the > > > > >>>> process. > > > > >>>>>>> > > > > >>>>>>> I wonder if what's happening here is that the threads that > are > > > > >>> spewing > > > > >>>>>> data > > > > >>>>>>> to disk (as that parallelize and persist would do) can write > to > > > > >> disk > > > > >>>>>> faster > > > > >>>>>>> than the cleanup threads can delete from disk. > > > > >>>>>>> > > > > >>>>>>> What do you think of that theory? > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> Andrew > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan < > > > > >>> [email protected] > > > > >>>>> > > > > >>>>>>> wrote: > > > > >>>>>>>> > > > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned ! > > > > >>>>>>>> On the other hand, how busy was your disk when this was > > > > >> happening ? > > > > >>>>>>>> (either due to spark or something else ?) > > > > >>>>>>>> > > > > >>>>>>>> It might just be that there was a lot of stuff to remove ? > > > > >>>>>>>> > > > > >>>>>>>> Regards, > > > > >>>>>>>> Mridul > > > > >>>>>>>> > > > > >>>>>>>> > > > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash < > > > [email protected] > > > > >>> > > > > >>>>>> wrote: > > > > >>>>>>>>> Hi Spark devs, > > > > >>>>>>>>> > > > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell > on > > > > >>> 0.9.0 > > > > >>>> one > > > > >>>>>>>>> of > > > > >>>>>>>>> my workers goes dead in the spark master UI. I'm using the > > > > >>>> standalone > > > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I > think > > > > >> it > > > > >>>> may > > > > >>>>>> be > > > > >>>>>>>>> a > > > > >>>>>>>>> regression. > > > > >>>>>>>>> > > > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM > with > > > > >>>> jstack > > > > >>>>>> and > > > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force > > flag. > > > > >>> The > > > > >>>>>> heap > > > > >>>>>>>>> isn't full, but there are some interesting bits in the > > jstack. > > > > >>>> Poking > > > > >>>>>>>>> around a little, I think there may be some kind of deadlock > > in > > > > >>> the > > > > >>>>>>>>> shutdown > > > > >>>>>>>>> hooks. > > > > >>>>>>>>> > > > > >>>>>>>>> Below are the threads I think are most interesting: > > > > >>>>>>>>> > > > > >>>>>>>>> Thread 14308: (state = BLOCKED) > > > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 > (Interpreted > > > > >>>> frame) > > > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 > (Interpreted > > > > >>>> frame) > > > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted > > > > >>> frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object, > > > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame) > > > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object) > > > > >> @bci=25, > > > > >>>>>>>>> line=498 > > > > >>>>>>>>> (Interpreted frame) > > > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) > > @bci=39, > > > > >>>>>> line=456 > > > > >>>>>>>>> (Interpreted frame) > > > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, > > > > >>>> line=237 > > > > >>>>>>>>> (Interpreted frame) > > > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 > (Interpreted > > > > >>>> frame) > > > > >>>>>>>>> - > > > > >>>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec() > > > > >>>>>>>>> @bci=4, line=386 (Interpreted frame) > > > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, > > > > >>>> line=260 > > > > >>>>>>>>> (Compiled frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask) > > > > >>>>>>>>> @bci=10, line=1339 (Compiled frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue) > > > > >>>>>>>>> @bci=11, line=1979 (Compiled frame) > > > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() > > > > >> @bci=14, > > > > >>>>>>>>> line=107 > > > > >>>>>>>>> (Interpreted frame) > > > > >>>>>>>>> > > > > >>>>>>>>> Thread 3865: (state = BLOCKED) > > > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame) > > > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 > (Interpreted > > > > >>>> frame) > > > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted > > > > >> frame) > > > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, > > > > >>> line=106 > > > > >>>>>>>>> (Interpreted frame) > > > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, > line=46 > > > > >>>>>>>>> (Interpreted > > > > >>>>>>>>> frame) > > > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 > > (Interpreted > > > > >>>> frame) > > > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 > > (Interpreted > > > > >>>> frame) > > > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 > (Interpreted > > > > >>>> frame) > > > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, > > > > >> line=52 > > > > >>>>>>>>> (Interpreted frame) > > > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted > > frame) > > > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted > > frame) > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> Thread 3987: (state = BLOCKED) > > > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0 > > > > >> (Interpreted > > > > >>>>>> frame) > > > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted > frame) > > > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled > frame) > > > > >>>>>>>>> - > org.apache.spark.util.Utils$.listFilesSafely(java.io.File) > > > > >>>> @bci=1, > > > > >>>>>>>>> line=466 (Interpreted frame) > > > > >>>>>>>>> - > > org.apache.spark.util.Utils$.deleteRecursively(java.io.File) > > > > >>>>>> @bci=9, > > > > >>>>>>>>> line=478 (Compiled frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File) > > > > >>>>>>>>> @bci=4, line=479 (Compiled frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object) > > > > >>>>>>>>> @bci=5, line=478 (Compiled frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, > > > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame) > > > > >>>>>>>>> - > > > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1) > > > > >>>>>>>>> @bci=2, > > > > >>>>>>>>> line=34 (Compiled frame) > > > > >>>>>>>>> - > > org.apache.spark.util.Utils$.deleteRecursively(java.io.File) > > > > >>>>>> @bci=19, > > > > >>>>>>>>> line=478 (Interpreted frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File) > > > > >>>>>>>>> @bci=14, line=141 (Interpreted frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object) > > > > >>>>>>>>> @bci=5, line=139 (Interpreted frame) > > > > >>>>>>>>> - > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, > > > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame) > > > > >>>>>>>>> - > > > > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) > > > > >>>>>>>>> @bci=2, > > > > >>>>>>>>> line=108 (Interpreted frame) > > > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run() > > > > >>> @bci=39, > > > > >>>>>>>>> line=139 (Interpreted frame) > > > > >>>>>>>>> > > > > >>>>>>>>> > > > > >>>>>>>>> I think what happened here is that thread 14308 received > the > > > > >> akka > > > > >>>>>>>>> "shutdown" message and called System.exit(). This started > > > > >> thread > > > > >>>>>> 3865, > > > > >>>>>>>>> which is the JVM shutting itself down. Part of that > process > > is > > > > >>>>>> running > > > > >>>>>>>>> the > > > > >>>>>>>>> shutdown hooks, so it started thread 3987. That thread is > > the > > > > >>>>>> shutdown > > > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, > which > > > > >>> looks > > > > >>>>>> like > > > > >>>>>>>>> this: > > > > >>>>>>>>> > > > > >>>>>>>>> private def addShutdownHook() { > > > > >>>>>>>>> localDirs.foreach(localDir => > > > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir)) > > > > >>>>>>>>> Runtime.getRuntime.addShutdownHook(new Thread("delete > > Spark > > > > >>>> local > > > > >>>>>>>>> dirs") { > > > > >>>>>>>>> override def run() { > > > > >>>>>>>>> logDebug("Shutdown hook called") > > > > >>>>>>>>> localDirs.foreach { localDir => > > > > >>>>>>>>> try { > > > > >>>>>>>>> if (!Utils.hasRootAsShutdownDeleteDir(localDir)) > > > > >>>>>>>>> Utils.deleteRecursively(localDir) > > > > >>>>>>>>> } catch { > > > > >>>>>>>>> case t: Throwable => > > > > >>>>>>>>> logError("Exception while deleting local spark > > > > >> dir: > > > > >>>> " + > > > > >>>>>>>>> localDir, t) > > > > >>>>>>>>> } > > > > >>>>>>>>> } > > > > >>>>>>>>> > > > > >>>>>>>>> if (shuffleSender != null) { > > > > >>>>>>>>> shuffleSender.stop() > > > > >>>>>>>>> } > > > > >>>>>>>>> } > > > > >>>>>>>>> }) > > > > >>>>>>>>> } > > > > >>>>>>>>> > > > > >>>>>>>>> It goes through and deletes the directories recursively. I > > was > > > > >>>>>> thinking > > > > >>>>>>>>> there might be some issues with concurrently-running > shutdown > > > > >>> hooks > > > > >>>>>>>>> deleting things out from underneath each other (shutdown > hook > > > > >>>> javadocs > > > > >>>>>>>>> say > > > > >>>>>>>>> they're all started in parallel if multiple hooks are > added) > > > > >>>> causing > > > > >>>>>> the > > > > >>>>>>>>> File.list() in that last thread to take quite some time. > > > > >>>>>>>>> > > > > >>>>>>>>> While I was looking through the stacktrace the JVM finally > > > > >> exited > > > > >>>>>> (after > > > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until > > this > > > > >>> bug > > > > >>>>>>>>> strikes > > > > >>>>>>>>> again. > > > > >>>>>>>>> > > > > >>>>>>>>> Any ideas on what might be going on here? > > > > >>>>>>>>> > > > > >>>>>>>>> Thanks! > > > > >>>>>>>>> Andrew > > > > >>>>>>> > > > > >>>>>>> > > > > >>>>>> > > > > >>>> > > > > >>> > > > > >> > > > > > > > > > > > > > >
