There is probably just one threadpool that has task threads -- is it possible to enumerate and interrupt just those? We may need to keep string a reference to that threadpool through to the shutdown thread to make that happen.
On Thu, Feb 6, 2014 at 10:36 PM, Mridul Muralidharan <[email protected]>wrote: > Ideally, interrupting the thread writing to disk should be sufficient > - though since we are in middle of shutdown when this is happening, it > is best case effort anyway. > Identifying which threads to interrupt will be interesting since most > of them are driven by threadpool's and we cant list all threads and > interrupt all of them ! > > > Regards, > Mridul > > > On Fri, Feb 7, 2014 at 5:57 AM, Andrew Ash <[email protected]> wrote: > > I think the solution where we stop the writing threads and then let the > > deleting threads completely clean up is the best option since the final > > state doesn't have half-deleted temp dirs scattered across the cluster. > > > > How feasible do you think it'd be to interrupt the other threads? > > > > > > On Thu, Feb 6, 2014 at 10:54 AM, Mridul Muralidharan <[email protected] > >wrote: > > > >> Looks like a pathological corner case here - where the the delete > >> thread is not getting run while the OS is busy prioritizing the thread > >> writing data (probably with heavy gc too). > >> Ideally, the delete thread would list files, remove them and then fail > >> when it tries to remove the non empty directory (since other thread > >> might be creating more in parallel). > >> > >> > >> Regards, > >> Mridul > >> > >> > >> On Thu, Feb 6, 2014 at 4:19 PM, Andrew Ash <[email protected]> > wrote: > >> > Got a repro locally on my MBP (the other was on a CentOS machine). > >> > > >> > Build spark, run a master and a worker with the sbin/start-all.sh > script, > >> > then run this in a shell: > >> > > >> > import org.apache.spark.storage.StorageLevel._ > >> > val s = sc.parallelize(1 to 1000000000).persist(MEMORY_AND_DISK_SER); > >> > s.count > >> > > >> > After about a minute, this line appears in the shell logging output: > >> > > >> > 14/02/06 02:44:44 WARN BlockManagerMasterActor: Removing BlockManager > >> > BlockManagerId(0, aash-mbp.dyn.yojoe.local, 57895, 0) with no recent > >> heart > >> > beats: 57510ms exceeds 45000ms > >> > > >> > Ctrl-C the shell. In jps there is now a worker, a master, and a > >> > CoarseGrainedExecutorBackend. > >> > > >> > Run jstack on the CGEBackend JVM, and I got the attached stacktraces. > I > >> > waited around for 15min then kill -9'd the JVM and restarted the > process. > >> > > >> > I wonder if what's happening here is that the threads that are spewing > >> data > >> > to disk (as that parallelize and persist would do) can write to disk > >> faster > >> > than the cleanup threads can delete from disk. > >> > > >> > What do you think of that theory? > >> > > >> > > >> > Andrew > >> > > >> > > >> > > >> > On Thu, Feb 6, 2014 at 2:30 AM, Mridul Muralidharan <[email protected] > > > >> > wrote: > >> >> > >> >> shutdown hooks should not take 15 mins are you mentioned ! > >> >> On the other hand, how busy was your disk when this was happening ? > >> >> (either due to spark or something else ?) > >> >> > >> >> It might just be that there was a lot of stuff to remove ? > >> >> > >> >> Regards, > >> >> Mridul > >> >> > >> >> > >> >> On Thu, Feb 6, 2014 at 3:50 PM, Andrew Ash <[email protected]> > >> wrote: > >> >> > Hi Spark devs, > >> >> > > >> >> > Occasionally when hitting Ctrl-C in the scala spark shell on 0.9.0 > one > >> >> > of > >> >> > my workers goes dead in the spark master UI. I'm using the > standalone > >> >> > cluster and didn't ever see this while using 0.8.0 so I think it > may > >> be > >> >> > a > >> >> > regression. > >> >> > > >> >> > When I prod on the hung CoarseGrainedExecutorBackend JVM with > jstack > >> and > >> >> > jmap -heap, it doesn't respond unless I add the -F force flag. The > >> heap > >> >> > isn't full, but there are some interesting bits in the jstack. > Poking > >> >> > around a little, I think there may be some kind of deadlock in the > >> >> > shutdown > >> >> > hooks. > >> >> > > >> >> > Below are the threads I think are most interesting: > >> >> > > >> >> > Thread 14308: (state = BLOCKED) > >> >> > - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted > frame) > >> >> > - java.lang.Runtime.exit(int) @bci=14, line=109 (Interpreted > frame) > >> >> > - java.lang.System.exit(int) @bci=4, line=962 (Interpreted frame) > >> >> > - > >> >> > > >> >> > > >> > org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(java.lang.Object, > >> >> > scala.Function1) @bci=352, line=81 (Interpreted frame) > >> >> > - akka.actor.ActorCell.receiveMessage(java.lang.Object) @bci=25, > >> >> > line=498 > >> >> > (Interpreted frame) > >> >> > - akka.actor.ActorCell.invoke(akka.dispatch.Envelope) @bci=39, > >> line=456 > >> >> > (Interpreted frame) > >> >> > - akka.dispatch.Mailbox.processMailbox(int, long) @bci=24, > line=237 > >> >> > (Interpreted frame) > >> >> > - akka.dispatch.Mailbox.run() @bci=20, line=219 (Interpreted > frame) > >> >> > - > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec() > >> >> > @bci=4, line=386 (Interpreted frame) > >> >> > - scala.concurrent.forkjoin.ForkJoinTask.doExec() @bci=10, > line=260 > >> >> > (Compiled frame) > >> >> > - > >> >> > > >> >> > > >> > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(scala.concurrent.forkjoin.ForkJoinTask) > >> >> > @bci=10, line=1339 (Compiled frame) > >> >> > - > >> >> > > >> >> > > >> > scala.concurrent.forkjoin.ForkJoinPool.runWorker(scala.concurrent.forkjoin.ForkJoinPool$WorkQueue) > >> >> > @bci=11, line=1979 (Compiled frame) > >> >> > - scala.concurrent.forkjoin.ForkJoinWorkerThread.run() @bci=14, > >> >> > line=107 > >> >> > (Interpreted frame) > >> >> > > >> >> > Thread 3865: (state = BLOCKED) > >> >> > - java.lang.Object.wait(long) @bci=0 (Interpreted frame) > >> >> > - java.lang.Thread.join(long) @bci=38, line=1280 (Interpreted > frame) > >> >> > - java.lang.Thread.join() @bci=2, line=1354 (Interpreted frame) > >> >> > - java.lang.ApplicationShutdownHooks.runHooks() @bci=87, line=106 > >> >> > (Interpreted frame) > >> >> > - java.lang.ApplicationShutdownHooks$1.run() @bci=0, line=46 > >> >> > (Interpreted > >> >> > frame) > >> >> > - java.lang.Shutdown.runHooks() @bci=39, line=123 (Interpreted > frame) > >> >> > - java.lang.Shutdown.sequence() @bci=26, line=167 (Interpreted > frame) > >> >> > - java.lang.Shutdown.exit(int) @bci=96, line=212 (Interpreted > frame) > >> >> > - java.lang.Terminator$1.handle(sun.misc.Signal) @bci=8, line=52 > >> >> > (Interpreted frame) > >> >> > - sun.misc.Signal$1.run() @bci=8, line=212 (Interpreted frame) > >> >> > - java.lang.Thread.run() @bci=11, line=744 (Interpreted frame) > >> >> > > >> >> > > >> >> > Thread 3987: (state = BLOCKED) > >> >> > - java.io.UnixFileSystem.list(java.io.File) @bci=0 (Interpreted > >> frame) > >> >> > - java.io.File.list() @bci=29, line=1116 (Interpreted frame) > >> >> > - java.io.File.listFiles() @bci=1, line=1201 (Compiled frame) > >> >> > - org.apache.spark.util.Utils$.listFilesSafely(java.io.File) > @bci=1, > >> >> > line=466 (Interpreted frame) > >> >> > - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) > >> @bci=9, > >> >> > line=478 (Compiled frame) > >> >> > - > >> >> > > >> >> > > >> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.io.File) > >> >> > @bci=4, line=479 (Compiled frame) > >> >> > - > >> >> > > >> >> > > >> > org.apache.spark.util.Utils$$anonfun$deleteRecursively$1.apply(java.lang.Object) > >> >> > @bci=5, line=478 (Compiled frame) > >> >> > - > >> >> > > >> >> > > >> > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame) > >> >> > - scala.collection.mutable.WrappedArray.foreach(scala.Function1) > >> >> > @bci=2, > >> >> > line=34 (Compiled frame) > >> >> > - org.apache.spark.util.Utils$.deleteRecursively(java.io.File) > >> @bci=19, > >> >> > line=478 (Interpreted frame) > >> >> > - > >> >> > > >> >> > > >> > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.io.File) > >> >> > @bci=14, line=141 (Interpreted frame) > >> >> > - > >> >> > > >> >> > > >> > org.apache.spark.storage.DiskBlockManager$$anon$1$$anonfun$run$2.apply(java.lang.Object) > >> >> > @bci=5, line=139 (Interpreted frame) > >> >> > - > >> >> > > >> >> > > >> > scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized, > >> >> > scala.Function1) @bci=22, line=33 (Compiled frame) > >> >> > - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1) > >> >> > @bci=2, > >> >> > line=108 (Interpreted frame) > >> >> > - org.apache.spark.storage.DiskBlockManager$$anon$1.run() @bci=39, > >> >> > line=139 (Interpreted frame) > >> >> > > >> >> > > >> >> > I think what happened here is that thread 14308 received the akka > >> >> > "shutdown" message and called System.exit(). This started thread > >> 3865, > >> >> > which is the JVM shutting itself down. Part of that process is > >> running > >> >> > the > >> >> > shutdown hooks, so it started thread 3987. That thread is the > >> shutdown > >> >> > hook from addShutdownHook() in DiskBlockManager.scala, which looks > >> like > >> >> > this: > >> >> > > >> >> > private def addShutdownHook() { > >> >> > localDirs.foreach(localDir => > >> >> > Utils.registerShutdownDeleteDir(localDir)) > >> >> > Runtime.getRuntime.addShutdownHook(new Thread("delete Spark > local > >> >> > dirs") { > >> >> > override def run() { > >> >> > logDebug("Shutdown hook called") > >> >> > localDirs.foreach { localDir => > >> >> > try { > >> >> > if (!Utils.hasRootAsShutdownDeleteDir(localDir)) > >> >> > Utils.deleteRecursively(localDir) > >> >> > } catch { > >> >> > case t: Throwable => > >> >> > logError("Exception while deleting local spark dir: > " + > >> >> > localDir, t) > >> >> > } > >> >> > } > >> >> > > >> >> > if (shuffleSender != null) { > >> >> > shuffleSender.stop() > >> >> > } > >> >> > } > >> >> > }) > >> >> > } > >> >> > > >> >> > It goes through and deletes the directories recursively. I was > >> thinking > >> >> > there might be some issues with concurrently-running shutdown hooks > >> >> > deleting things out from underneath each other (shutdown hook > javadocs > >> >> > say > >> >> > they're all started in parallel if multiple hooks are added) > causing > >> the > >> >> > File.list() in that last thread to take quite some time. > >> >> > > >> >> > While I was looking through the stacktrace the JVM finally exited > >> (after > >> >> > 15-20min at least) so I won't be able to debug more until this bug > >> >> > strikes > >> >> > again. > >> >> > > >> >> > Any ideas on what might be going on here? > >> >> > > >> >> > Thanks! > >> >> > Andrew > >> > > >> > > >> >
