This looks like the most reasonable approach to resolve this ! Regards, Mridul
On Fri, Feb 7, 2014 at 1:43 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: > Or we can try adding a shutdown hook in the > Executor<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L127>to > call threadPool.shutdownNow(). May have to catch the > InterruptedException and handle it gracefully out > here<https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala?source=c#L255> > . > > TD > > > On Thu, Feb 6, 2014 at 11:49 PM, Andrew Ash <and...@andrewash.com> wrote: > >> I think we can enumerate all current threads with the ThreadMXBean, filter >> to those threads with the name of executor pool in them, and interrupt >> them. >> >> >> http://docs.oracle.com/javase/6/docs/api/java/lang/management/ManagementFactory.html#getThreadMXBean%28%29 >> >> The executor threads are currently named according to the pattern "Executor >> task launch worker-X" >> >> >> On Thu, Feb 6, 2014 at 11:45 PM, Tathagata Das >> <tathagata.das1...@gmail.com>wrote: >> >> > That definitely sound more reliable. Worth trying out if there is a >> > reliable way of reproducing the deadlock-like scenario. >> > >> > TD >> > >> > >> > On Thu, Feb 6, 2014 at 11:38 PM, Matei Zaharia <matei.zaha...@gmail.com >> > >wrote: >> > >> > > I don't think we necessarily want to do this through the DAGScheduler >> > > because the worker might also shut down due to some unusual termination >> > > condition, like the driver node crashing. Can't we do it at the top of >> > the >> > > shutdown hook instead? If all the threads are in the same thread pool >> it >> > > might be possible to interrupt or stop the whole pool. >> > > >> > > Matei >> > > >> > > On Feb 6, 2014, at 11:30 PM, Andrew Ash <and...@andrewash.com> wrote: >> > > >> > > > That's genius. Of course when a worker is told to shutdown it should >> > > > interrupt its worker threads -- I think that would address this >> issue. >> > > > >> > > > Are you thinking to put >> > > > >> > > > running.map(_.jobId).foreach { handleJobCancellation } >> > > > >> > > > at the top of the StopDAGScheduler block? >> > > > >> > > > >> > > > On Thu, Feb 6, 2014 at 11:05 PM, Tathagata Das >> > > > <tathagata.das1...@gmail.com>wrote: >> > > > >> > > >> Its highly likely that the executor with the threadpool that runs >> the >> > > tasks >> > > >> are the only set of threads that writes to disk. The tasks are >> > designed >> > > to >> > > >> be interrupted when the corresponding job is cancelled. So a >> > reasonably >> > > >> simple way could be to actually cancel the currently active jobs, >> > which >> > > >> would send the signal to the worker to stop the tasks. Currently, >> the >> > > >> DAGScheduler< >> > > >> >> > > >> > >> https://github.com/apache/incubator-spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L610 >> > > >>> does >> > > >> not seem to actually cancel the jobs, only mark them as failed. So >> it >> > > >> may be a simple addition. >> > > >> >> > > >> There may be some complications with the external spilling of >> shuffle >> > > data >> > > >> to disk not stopping immediately when the task is marked for >> killing. >> > > Gotta >> > > >> try it out. >> > > >> >> > > >> TD >> > > >> >> > > >> On Thu, Feb 6, 2014 at 10:39 PM, Andrew Ash <and...@andrewash.com> >> > > wrote: >> > > >> >> > > >>> There is probably just one threadpool that has task threads -- is >> it >> > > >>> possible to enumerate and interrupt just those? We may need to >> keep >> > > >> string >> > > >>> a reference to that threadpool through to the shutdown thread to >> make >> > > >> that >> > > >>> happen. >> > > >>> >> > > >>> >> > > >>> On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan < >> > mri...@gmail.com >> > > >>>> wrote: >> > > >>> >> > > >>>> Ideally, interrupting the thread writing to disk should be >> > sufficient >> > > >>>> - though since we are in middle of shutdown when this is >> happening, >> > it >> > > >>>> is best case effort anyway. >> > > >>>> Identifying which threads to interrupt will be interesting since >> > most >> > > >>>> of them are driven by threadpool's and we cant list all threads >> and >> > > >>>> interrupt all of them ! >> > > >>>> >> > > >>>> >> > > >>>> Regards, >> > > >>>> Mridul >> > > >>>> >> > > >>>> >> > > >>>> On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <and...@andrewash.com> >> > > >> wrote: >> > > >>>>> I think the solution where we stop the writing threads and then >> let >> > > >> the >> > > >>>>> deleting threads completely clean up is the best option since the >> > > >> final >> > > >>>>> state doesn't have half-deleted temp dirs scattered across the >> > > >> cluster. >> > > >>>>> >> > > >>>>> How feasible do you think it'd be to interrupt the other threads? >> > > >>>>> >> > > >>>>> >> > > >>>>> On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan < >> > > >> mri...@gmail.com >> > > >>>>> wrote: >> > > >>>>> >> > > >>>>>> Looks like a pathological corner case here - where the the >> delete >> > > >>>>>> thread is not getting run while the OS is busy prioritizing the >> > > >> thread >> > > >>>>>> writing data (probably with heavy gc too). >> > > >>>>>> Ideally, the delete thread would list files, remove them and >> then >> > > >> fail >> > > >>>>>> when it tries to remove the non empty directory (since other >> > thread >> > > >>>>>> might be creating more in parallel). >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> Regards, >> > > >>>>>> Mridul >> > > >>>>>> >> > > >>>>>> >> > > >>>>>> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash < >> and...@andrewash.com> >> > > >>>> wrote: >> > > >>>>>>> Got a repro locally on my MBP (the other was on a CentOS >> > machine). >> > > >>>>>>> >> > > >>>>>>> Build spark, run a master and a worker with the >> sbin/start-all.sh >> > > >>>> script, >> > > >>>>>>> then run this in a shell: >> > > >>>>>>> >> > > >>>>>>> import org.apache.spark.storage.StorageLevel._ >> > > >>>>>>> val s = sc.parallelize(1 to >> > > >>> 1000000000).persist(MEMORY_AND_DISK_SER); >> > > >>>>>>> s.count >> > > >>>>>>> >> > > >>>>>>> After about a minute, this line appears in the shell logging >> > > >> output: >> > > >>>>>>> >> > > >>>>>>> 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing >> > > >>> BlockManager >> > > >>>>>>> BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no >> > > >> recent >> > > >>>>>> heart >> > > >>>>>>> beats: 57510ms exceeds 45000ms >> > > >>>>>>> >> > > >>>>>>> Ctrl-C the shell. In jps there is now a worker, a master, and >> a >> > > >>>>>>> CoarseGrainedExecutorBackend. >> > > >>>>>>> >> > > >>>>>>> Run jstack on the CGEBackend JVM, and I got the attached >> > > >>> stacktraces. >> > > >>>> I >> > > >>>>>>> waited around for 15min then kill -9'd the JVM and restarted >> the >> > > >>>> process. >> > > >>>>>>> >> > > >>>>>>> I wonder if what's happening here is that the threads that are >> > > >>> spewing >> > > >>>>>> data >> > > >>>>>>> to disk (as that parallelize and persist would do) can write to >> > > >> disk >> > > >>>>>> faster >> > > >>>>>>> than the cleanup threads can delete from disk. >> > > >>>>>>> >> > > >>>>>>> What do you think of that theory? >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> Andrew >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>>> On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan < >> > > >>> mri...@gmail.com >> > > >>>>> >> > > >>>>>>> wrote: >> > > >>>>>>>> >> > > >>>>>>>> shutdown hooks should not take 15 mins are you mentioned ! >> > > >>>>>>>> On the other hand, how busy was your disk when this was >> > > >> happening ? >> > > >>>>>>>> (either due to spark or something else ?) >> > > >>>>>>>> >> > > >>>>>>>> It might just be that there was a lot of stuff to remove ? >> > > >>>>>>>> >> > > >>>>>>>> Regards, >> > > >>>>>>>> Mridul >> > > >>>>>>>> >> > > >>>>>>>> >> > > >>>>>>>> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash < >> > and...@andrewash.com >> > > >>> >> > > >>>>>> wrote: >> > > >>>>>>>>> Hi Spark devs, >> > > >>>>>>>>> >> > > >>>>>>>>> Occasionally when hitting Ctrl-C in the scala spark shell on >> > > >>> 0.9.0 >> > > >>>> one >> > > >>>>>>>>> of >> > > >>>>>>>>> my workers goes dead in the spark master UI. I'm using the >> > > >>>> standalone >> > > >>>>>>>>> cluster and didn't ever see this while using 0.8.0 so I think >> > > >> it >> > > >>>> may >> > > >>>>>> be >> > > >>>>>>>>> a >> > > >>>>>>>>> regression. >> > > >>>>>>>>> >> > > >>>>>>>>> When I prod on the hung CoarseGrainedExecutorBackend JVM with >> > > >>>> jstack >> > > >>>>>> and >> > > >>>>>>>>> jmap -heap, it doesn't respond unless I add the -F force >> flag. >> > > >>> The >> > > >>>>>> heap >> > > >>>>>>>>> isn't full, but there are some interesting bits in the >> jstack. >> > > >>>> Poking >> > > >>>>>>>>> around a little, I think there may be some kind of deadlock >> in >> > > >>> the >> > > >>>>>>>>> shutdown >> > > >>>>>>>>> hooks. >> > > >>>>>>>>> >> > > >>>>>>>>> Below are the threads I think are most interesting: >> > > >>>>>>>>> >> > > >>>>>>>>> Thread 14308: (state = BLOCKED) >> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted >> > > >>>> frame) >> > > >>>>>>>>> - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted >> > > >>>> frame) >> > > >>>>>>>>> - java.lang.System.exit(int) @bci=4, line=962 (Interpreted >> > > >>> frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object, >> > > >>>>>>>>> scala.Function1) @bci=352, line=81 (Interpreted frame) >> > > >>>>>>>>> - akka.actor.ActorCell.receiveMessage(java.lang.Object) >> > > >> @bci=25, >> > > >>>>>>>>> line=498 >> > > >>>>>>>>> (Interpreted frame) >> > > >>>>>>>>> - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) >> @bci=39, >> > > >>>>>> line=456 >> > > >>>>>>>>> (Interpreted frame) >> > > >>>>>>>>> - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, >> > > >>>> line=237 >> > > >>>>>>>>> (Interpreted frame) >> > > >>>>>>>>> - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted >> > > >>>> frame) >> > > >>>>>>>>> - >> > > >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec() >> > > >>>>>>>>> @bci=4, line=386 (Interpreted frame) >> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, >> > > >>>> line=260 >> > > >>>>>>>>> (Compiled frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask) >> > > >>>>>>>>> @bci=10, line=1339 (Compiled frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue) >> > > >>>>>>>>> @bci=11, line=1979 (Compiled frame) >> > > >>>>>>>>> - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() >> > > >> @bci=14, >> > > >>>>>>>>> line=107 >> > > >>>>>>>>> (Interpreted frame) >> > > >>>>>>>>> >> > > >>>>>>>>> Thread 3865: (state = BLOCKED) >> > > >>>>>>>>> - java.lang.Object.wait(long) @bci=0 (Interpreted frame) >> > > >>>>>>>>> - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted >> > > >>>> frame) >> > > >>>>>>>>> - java.lang.Thread.join() @bci=2, line=1354 (Interpreted >> > > >> frame) >> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, >> > > >>> line=106 >> > > >>>>>>>>> (Interpreted frame) >> > > >>>>>>>>> - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46 >> > > >>>>>>>>> (Interpreted >> > > >>>>>>>>> frame) >> > > >>>>>>>>> - java.lang.Shutdown.runHooks() @bci=39, line=123 >> (Interpreted >> > > >>>> frame) >> > > >>>>>>>>> - java.lang.Shutdown.sequence() @bci=26, line=167 >> (Interpreted >> > > >>>> frame) >> > > >>>>>>>>> - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted >> > > >>>> frame) >> > > >>>>>>>>> - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, >> > > >> line=52 >> > > >>>>>>>>> (Interpreted frame) >> > > >>>>>>>>> - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted >> frame) >> > > >>>>>>>>> - java.lang.Thread.run() @bci=11, line=744 (Interpreted >> frame) >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> Thread 3987: (state = BLOCKED) >> > > >>>>>>>>> - java.io.UnixFileSystem.list(java.io.File) @bci=0 >> > > >> (Interpreted >> > > >>>>>> frame) >> > > >>>>>>>>> - java.io.File.list() @bci=29, line=1116 (Interpreted frame) >> > > >>>>>>>>> - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame) >> > > >>>>>>>>> - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) >> > > >>>> @bci=1, >> > > >>>>>>>>> line=466 (Interpreted frame) >> > > >>>>>>>>> - >> org.apache.spark.util.Utils$.deleteRecursively(java.io.File) >> > > >>>>>> @bci=9, >> > > >>>>>>>>> line=478 (Compiled frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File) >> > > >>>>>>>>> @bci=4, line=479 (Compiled frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object) >> > > >>>>>>>>> @bci=5, line=478 (Compiled frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, >> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame) >> > > >>>>>>>>> - >> > > >> scala.collection.mutable.WrappedArray.foreach(scala.Function1) >> > > >>>>>>>>> @bci=2, >> > > >>>>>>>>> line=34 (Compiled frame) >> > > >>>>>>>>> - >> org.apache.spark.util.Utils$.deleteRecursively(java.io.File) >> > > >>>>>> @bci=19, >> > > >>>>>>>>> line=478 (Interpreted frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File) >> > > >>>>>>>>> @bci=14, line=141 (Interpreted frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object) >> > > >>>>>>>>> @bci=5, line=139 (Interpreted frame) >> > > >>>>>>>>> - >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > >> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, >> > > >>>>>>>>> scala.Function1) @bci=22, line=33 (Compiled frame) >> > > >>>>>>>>> - >> > > >>> scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) >> > > >>>>>>>>> @bci=2, >> > > >>>>>>>>> line=108 (Interpreted frame) >> > > >>>>>>>>> - org.apache.spark.storage.DiskBlockManager$$anon$1.run() >> > > >>> @bci=39, >> > > >>>>>>>>> line=139 (Interpreted frame) >> > > >>>>>>>>> >> > > >>>>>>>>> >> > > >>>>>>>>> I think what happened here is that thread 14308 received the >> > > >> akka >> > > >>>>>>>>> "shutdown" message and called System.exit(). This started >> > > >> thread >> > > >>>>>> 3865, >> > > >>>>>>>>> which is the JVM shutting itself down. Part of that process >> is >> > > >>>>>> running >> > > >>>>>>>>> the >> > > >>>>>>>>> shutdown hooks, so it started thread 3987. That thread is >> the >> > > >>>>>> shutdown >> > > >>>>>>>>> hook from addShutdownHook() in DiskBlockManager.scala, which >> > > >>> looks >> > > >>>>>> like >> > > >>>>>>>>> this: >> > > >>>>>>>>> >> > > >>>>>>>>> private def addShutdownHook() { >> > > >>>>>>>>> localDirs.foreach(localDir => >> > > >>>>>>>>> Utils.registerShutdownDeleteDir(localDir)) >> > > >>>>>>>>> Runtime.getRuntime.addShutdownHook(new Thread("delete >> Spark >> > > >>>> local >> > > >>>>>>>>> dirs") { >> > > >>>>>>>>> override def run() { >> > > >>>>>>>>> logDebug("Shutdown hook called") >> > > >>>>>>>>> localDirs.foreach { localDir => >> > > >>>>>>>>> try { >> > > >>>>>>>>> if (!Utils.hasRootAsShutdownDeleteDir(localDir)) >> > > >>>>>>>>> Utils.deleteRecursively(localDir) >> > > >>>>>>>>> } catch { >> > > >>>>>>>>> case t: Throwable => >> > > >>>>>>>>> logError("Exception while deleting local spark >> > > >> dir: >> > > >>>> " + >> > > >>>>>>>>> localDir, t) >> > > >>>>>>>>> } >> > > >>>>>>>>> } >> > > >>>>>>>>> >> > > >>>>>>>>> if (shuffleSender != null) { >> > > >>>>>>>>> shuffleSender.stop() >> > > >>>>>>>>> } >> > > >>>>>>>>> } >> > > >>>>>>>>> }) >> > > >>>>>>>>> } >> > > >>>>>>>>> >> > > >>>>>>>>> It goes through and deletes the directories recursively. I >> was >> > > >>>>>> thinking >> > > >>>>>>>>> there might be some issues with concurrently-running shutdown >> > > >>> hooks >> > > >>>>>>>>> deleting things out from underneath each other (shutdown hook >> > > >>>> javadocs >> > > >>>>>>>>> say >> > > >>>>>>>>> they're all started in parallel if multiple hooks are added) >> > > >>>> causing >> > > >>>>>> the >> > > >>>>>>>>> File.list() in that last thread to take quite some time. >> > > >>>>>>>>> >> > > >>>>>>>>> While I was looking through the stacktrace the JVM finally >> > > >> exited >> > > >>>>>> (after >> > > >>>>>>>>> 15-20min at least) so I won't be able to debug more until >> this >> > > >>> bug >> > > >>>>>>>>> strikes >> > > >>>>>>>>> again. >> > > >>>>>>>>> >> > > >>>>>>>>> Any ideas on what might be going on here? >> > > >>>>>>>>> >> > > >>>>>>>>> Thanks! >> > > >>>>>>>>> Andrew >> > > >>>>>>> >> > > >>>>>>> >> > > >>>>>> >> > > >>>> >> > > >>> >> > > >> >> > > >> > > >> > >>