Repository: spark
Updated Branches:
  refs/heads/branch-1.5 8537e51d3 -> 74c9dcec3


[SPARK-9826] [CORE] Fix cannot use custom classes in log4j.properties

Refactor Utils class and create ShutdownHookManager.

NOTE: Wasn't able to run /dev/run-tests on windows machine.
Manual tests were conducted locally using custom log4j.properties file with 
Redis appender and logstash formatter (bundled in the fat-jar submitted to 
spark)

ex:
log4j.rootCategory=WARN,console,redis
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: 
%m%n

log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.spark.graphx.Pregel=INFO

log4j.appender.redis=com.ryantenney.log4j.FailoverRedisAppender
log4j.appender.redis.endpoints=hostname:port
log4j.appender.redis.key=mykey
log4j.appender.redis.alwaysBatch=false
log4j.appender.redis.layout=net.logstash.log4j.JSONEventLayoutV1

Author: michellemay <mle...@gmail.com>

Closes #8109 from michellemay/SPARK-9826.

(cherry picked from commit ab7e721cfec63155641e81e72b4ad43cf6a7d4c7)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/74c9dcec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/74c9dcec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/74c9dcec

Branch: refs/heads/branch-1.5
Commit: 74c9dcec34214213955baa23098d7d23bc733346
Parents: 8537e51
Author: Michel Lemay <mle...@gmail.com>
Authored: Wed Aug 12 16:17:58 2015 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Aug 12 16:41:58 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   5 +-
 .../spark/deploy/history/HistoryServer.scala    |   4 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |   7 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   4 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   4 +-
 .../org/apache/spark/rdd/SqlNewHadoopRDD.scala  |   4 +-
 .../apache/spark/storage/DiskBlockManager.scala |  10 +-
 .../spark/storage/TachyonBlockManager.scala     |   6 +-
 .../apache/spark/util/ShutdownHookManager.scala | 266 +++++++++++++++++++
 .../util/SparkUncaughtExceptionHandler.scala    |   2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 222 +---------------
 .../hive/thriftserver/HiveThriftServer2.scala   |   4 +-
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |   4 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |   4 +-
 .../spark/streaming/StreamingContext.scala      |   8 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   5 +-
 16 files changed, 307 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 207a0c1..2e01a9a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -563,7 +563,8 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     // Make sure the context is stopped if the user forgets about it. This 
avoids leaving
     // unfinished event logs around after the JVM exits cleanly. It doesn't 
help if the JVM
     // is killed, though.
-    _shutdownHookRef = 
Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
+      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
       logInfo("Invoking stop() from shutdown hook")
       stop()
     }
@@ -1671,7 +1672,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       return
     }
     if (_shutdownHookRef != null) {
-      Utils.removeShutdownHook(_shutdownHookRef)
+      ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
     }
 
     Utils.tryLogNonFatalError {

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index a076a9c..d4f327c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -30,7 +30,7 @@ import org.apache.spark.status.api.v1.{ApiRootResource, 
ApplicationInfo, Applica
   UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils}
 
 /**
  * A web server that renders SparkUIs of completed applications.
@@ -238,7 +238,7 @@ object HistoryServer extends Logging {
     val server = new HistoryServer(conf, provider, securityManager, port)
     server.bind()
 
-    Utils.addShutdownHook { () => server.stop() }
+    ShutdownHookManager.addShutdownHook { () => server.stop() }
 
     // Wait until the end of the world... or if the HistoryServer process is 
manually stopped
     while(true) { Thread.sleep(Int.MaxValue) }

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 29a5042..ab3fea4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.{SecurityManager, SparkConf, Logging}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.util.logging.FileAppender
 
 /**
@@ -70,7 +70,8 @@ private[deploy] class ExecutorRunner(
     }
     workerThread.start()
     // Shutdown hook that kills actors on shutdown.
-    shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker 
shutting down")) }
+    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
+      killProcess(Some("Worker shutting down")) }
   }
 
   /**
@@ -102,7 +103,7 @@ private[deploy] class ExecutorRunner(
       workerThread = null
       state = ExecutorState.KILLED
       try {
-        Utils.removeShutdownHook(shutdownHook)
+        ShutdownHookManager.removeShutdownHook(shutdownHook)
       } catch {
         case e: IllegalStateException => None
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index f1c1736..e1f8719 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, NextIterator, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, 
NextIterator, Utils}
 import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
 import org.apache.spark.storage.StorageLevel
 
@@ -274,7 +274,7 @@ class HadoopRDD[K, V](
           }
         } catch {
           case e: Exception => {
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index f83a051..6a9c004 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, 
Utils}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.StorageLevel
 
@@ -186,7 +186,7 @@ class NewHadoopRDD[K, V](
           }
         } catch {
           case e: Exception => {
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 6a95e44..fa3fecc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.{Partition => SparkPartition, _}
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, 
Utils}
 
 
 private[spark] class SqlNewHadoopPartition(
@@ -212,7 +212,7 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
           }
         } catch {
           case e: Exception =>
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 56a33d5..3f8d26e 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -22,7 +22,7 @@ import java.io.{IOException, File}
 
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * Creates and maintains the logical mapping between logical blocks and 
physical on-disk
@@ -144,7 +144,7 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   }
 
   private def addShutdownHook(): AnyRef = {
-    Utils.addShutdownHook(Utils.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
+    
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY
 + 1) { () =>
       logInfo("Shutdown hook called")
       DiskBlockManager.this.doStop()
     }
@@ -154,7 +154,7 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   private[spark] def stop() {
     // Remove the shutdown hook.  It causes memory leaks if we leave it around.
     try {
-      Utils.removeShutdownHook(shutdownHook)
+      ShutdownHookManager.removeShutdownHook(shutdownHook)
     } catch {
       case e: Exception =>
         logError(s"Exception while removing shutdown hook.", e)
@@ -168,7 +168,9 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
       localDirs.foreach { localDir =>
         if (localDir.isDirectory() && localDir.exists()) {
           try {
-            if (!Utils.hasRootAsShutdownDeleteDir(localDir)) 
Utils.deleteRecursively(localDir)
+            if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
+              Utils.deleteRecursively(localDir)
+            }
           } catch {
             case e: Exception =>
               logError(s"Exception while deleting local spark dir: $localDir", 
e)

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index ebad5bc..2287878 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -32,7 +32,7 @@ import tachyon.TachyonURI
 
 import org.apache.spark.Logging
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 
 /**
@@ -80,7 +80,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     // in order to avoid having really large inodes at the top level in 
Tachyon.
     tachyonDirs = createTachyonDirs()
     subDirs = Array.fill(tachyonDirs.length)(new 
Array[TachyonFile](subDirsPerTachyonDir))
-    tachyonDirs.foreach(tachyonDir => 
Utils.registerShutdownDeleteDir(tachyonDir))
+    tachyonDirs.foreach(tachyonDir => 
ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
   }
 
   override def toString: String = {"ExternalBlockStore-Tachyon"}
@@ -240,7 +240,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     logDebug("Shutdown hook called")
     tachyonDirs.foreach { tachyonDir =>
       try {
-        if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+        if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
           Utils.deleteRecursively(tachyonDir, client)
         }
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
new file mode 100644
index 0000000..61ff9b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.File
+import java.util.PriorityQueue
+
+import scala.util.{Failure, Success, Try}
+import tachyon.client.TachyonFile
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.Logging
+
+/**
+ * Various utility methods used by Spark.
+ */
+private[spark] object ShutdownHookManager extends Logging {
+  val DEFAULT_SHUTDOWN_PRIORITY = 100
+
+  /**
+   * The shutdown priority of the SparkContext instance. This is lower than 
the default
+   * priority, so that by default hooks are run before the context is shut 
down.
+   */
+  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
+  /**
+   * The shutdown priority of temp directory must be lower than the 
SparkContext shutdown
+   * priority. Otherwise cleaning the temp directories while Spark jobs are 
running can
+   * throw undesirable errors at the time of shutdown.
+   */
+  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
+
+  private lazy val shutdownHooks = {
+    val manager = new SparkShutdownHookManager()
+    manager.install()
+    manager
+  }
+
+  private val shutdownDeletePaths = new 
scala.collection.mutable.HashSet[String]()
+  private val shutdownDeleteTachyonPaths = new 
scala.collection.mutable.HashSet[String]()
+
+  // Add a shutdown hook to delete the temp dirs when the JVM exits
+  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
+    logInfo("Shutdown hook called")
+    shutdownDeletePaths.foreach { dirPath =>
+      try {
+        logInfo("Deleting directory " + dirPath)
+        Utils.deleteRecursively(new File(dirPath))
+      } catch {
+        case e: Exception => logError(s"Exception while deleting Spark temp 
dir: $dirPath", e)
+      }
+    }
+  }
+
+  // Register the path to be deleted via shutdown hook
+  def registerShutdownDeleteDir(file: File) {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths += absolutePath
+    }
+  }
+
+  // Register the tachyon path to be deleted via shutdown hook
+  def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
+    val absolutePath = tachyonfile.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths += absolutePath
+    }
+  }
+
+  // Remove the path to be deleted via shutdown hook
+  def removeShutdownDeleteDir(file: File) {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.remove(absolutePath)
+    }
+  }
+
+  // Remove the tachyon path to be deleted via shutdown hook
+  def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
+    val absolutePath = tachyonfile.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.remove(absolutePath)
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  def hasShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.contains(absolutePath)
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
+    val absolutePath = file.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.contains(absolutePath)
+    }
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, 
then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
+  // paths - resulting in IOException and incomplete cleanup.
+  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    val retval = shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.exists { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
+    }
+    retval
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, 
then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
+  // paths - resulting in Exception and incomplete cleanup.
+  def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
+    val absolutePath = file.getPath()
+    val retval = shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.exists { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
+    }
+    retval
+  }
+
+  /**
+   * Detect whether this thread might be executing a shutdown hook. Will 
always return true if
+   * the current thread is a running a shutdown hook but may spuriously return 
true otherwise (e.g.
+   * if System.exit was just called by a concurrent thread).
+   *
+   * Currently, this detects whether the JVM is shutting down by 
Runtime#addShutdownHook throwing
+   * an IllegalStateException.
+   */
+  def inShutdown(): Boolean = {
+    try {
+      val hook = new Thread {
+        override def run() {}
+      }
+      Runtime.getRuntime.addShutdownHook(hook)
+      Runtime.getRuntime.removeShutdownHook(hook)
+    } catch {
+      case ise: IllegalStateException => return true
+    }
+    false
+  }
+
+  /**
+   * Adds a shutdown hook with default priority.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(hook: () => Unit): AnyRef = {
+    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
+  }
+
+  /**
+   * Adds a shutdown hook with the given priority. Hooks with lower priority 
values run
+   * first.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
+    shutdownHooks.add(priority, hook)
+  }
+
+  /**
+   * Remove a previously installed shutdown hook.
+   *
+   * @param ref A handle returned by `addShutdownHook`.
+   * @return Whether the hook was removed.
+   */
+  def removeShutdownHook(ref: AnyRef): Boolean = {
+    shutdownHooks.remove(ref)
+  }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+  private val hooks = new PriorityQueue[SparkShutdownHook]()
+  private var shuttingDown = false
+
+  /**
+   * Install a hook to run at shutdown and run all registered hooks in order. 
Hadoop 1.x does not
+   * have `ShutdownHookManager`, so in that case we just use the JVM's 
`Runtime` object and hope for
+   * the best.
+   */
+  def install(): Unit = {
+    val hookTask = new Runnable() {
+      override def run(): Unit = runAll()
+    }
+    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) 
match {
+      case Success(shmClass) =>
+        val fsPriority = 
classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+          .asInstanceOf[Int]
+        val shm = shmClass.getMethod("get").invoke(null)
+        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], 
classOf[Int])
+          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+      case Failure(_) =>
+        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark 
Shutdown Hook"));
+    }
+  }
+
+  def runAll(): Unit = synchronized {
+    shuttingDown = true
+    while (!hooks.isEmpty()) {
+      Try(Utils.logUncaughtExceptions(hooks.poll().run()))
+    }
+  }
+
+  def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+    checkState()
+    val hookRef = new SparkShutdownHook(priority, hook)
+    hooks.add(hookRef)
+    hookRef
+  }
+
+  def remove(ref: AnyRef): Boolean = synchronized {
+    hooks.remove(ref)
+  }
+
+  private def checkState(): Unit = {
+    if (shuttingDown) {
+      throw new IllegalStateException("Shutdown hooks cannot be modified 
during shutdown.")
+    }
+  }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+  extends Comparable[SparkShutdownHook] {
+
+  override def compareTo(other: SparkShutdownHook): Int = {
+    other.priority - priority
+  }
+
+  def run(): Unit = hook()
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index ad3db1f..7248187 100644
--- 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -33,7 +33,7 @@ private[spark] object SparkUncaughtExceptionHandler
 
       // We may have been called from a shutdown hook. If so, we must not call 
System.exit().
       // (If we do, we will deadlock.)
-      if (!Utils.inShutdown()) {
+      if (!ShutdownHookManager.inShutdown()) {
         if (exception.isInstanceOf[OutOfMemoryError]) {
           System.exit(SparkExitCode.OOM)
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a90d854..f2abf22 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
-import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
 import java.util.concurrent._
 import javax.net.ssl.HttpsURLConnection
 
@@ -65,21 +65,6 @@ private[spark] object CallSite {
 private[spark] object Utils extends Logging {
   val random = new Random()
 
-  val DEFAULT_SHUTDOWN_PRIORITY = 100
-
-  /**
-   * The shutdown priority of the SparkContext instance. This is lower than 
the default
-   * priority, so that by default hooks are run before the context is shut 
down.
-   */
-  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
-
-  /**
-   * The shutdown priority of temp directory must be lower than the 
SparkContext shutdown
-   * priority. Otherwise cleaning the temp directories while Spark jobs are 
running can
-   * throw undesirable errors at the time of shutdown.
-   */
-  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
-
   /**
    * Define a default value for driver memory here since this value is 
referenced across the code
    * base and nearly all files already use Utils.scala
@@ -90,9 +75,6 @@ private[spark] object Utils extends Logging {
   @volatile private var localRootDirs: Array[String] = null
 
 
-  private val shutdownHooks = new SparkShutdownHookManager()
-  shutdownHooks.install()
-
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -205,86 +187,6 @@ private[spark] object Utils extends Logging {
     }
   }
 
-  private val shutdownDeletePaths = new 
scala.collection.mutable.HashSet[String]()
-  private val shutdownDeleteTachyonPaths = new 
scala.collection.mutable.HashSet[String]()
-
-  // Add a shutdown hook to delete the temp dirs when the JVM exits
-  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
-    logInfo("Shutdown hook called")
-    shutdownDeletePaths.foreach { dirPath =>
-      try {
-        logInfo("Deleting directory " + dirPath)
-        Utils.deleteRecursively(new File(dirPath))
-      } catch {
-        case e: Exception => logError(s"Exception while deleting Spark temp 
dir: $dirPath", e)
-      }
-    }
-  }
-
-  // Register the path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(file: File) {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths += absolutePath
-    }
-  }
-
-  // Register the tachyon path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
-    val absolutePath = tachyonfile.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths += absolutePath
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.contains(absolutePath)
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.contains(absolutePath)
-    }
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, 
then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
-  // paths - resulting in IOException and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    val retval = shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.exists { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, 
then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
-  // paths - resulting in Exception and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    val retval = shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.exists { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
   /**
    * JDK equivalent of `chmod 700 file`.
    *
@@ -333,7 +235,7 @@ private[spark] object Utils extends Logging {
       root: String = System.getProperty("java.io.tmpdir"),
       namePrefix: String = "spark"): File = {
     val dir = createDirectory(root, namePrefix)
-    registerShutdownDeleteDir(dir)
+    ShutdownHookManager.registerShutdownDeleteDir(dir)
     dir
   }
 
@@ -973,9 +875,7 @@ private[spark] object Utils extends Logging {
           if (savedIOException != null) {
             throw savedIOException
           }
-          shutdownDeletePaths.synchronized {
-            shutdownDeletePaths.remove(file.getAbsolutePath)
-          }
+          ShutdownHookManager.removeShutdownDeleteDir(file)
         }
       } finally {
         if (!file.delete()) {
@@ -1478,27 +1378,6 @@ private[spark] object Utils extends Logging {
     serializer.deserialize[T](serializer.serialize(value))
   }
 
-  /**
-   * Detect whether this thread might be executing a shutdown hook. Will 
always return true if
-   * the current thread is a running a shutdown hook but may spuriously return 
true otherwise (e.g.
-   * if System.exit was just called by a concurrent thread).
-   *
-   * Currently, this detects whether the JVM is shutting down by 
Runtime#addShutdownHook throwing
-   * an IllegalStateException.
-   */
-  def inShutdown(): Boolean = {
-    try {
-      val hook = new Thread {
-        override def run() {}
-      }
-      Runtime.getRuntime.addShutdownHook(hook)
-      Runtime.getRuntime.removeShutdownHook(hook)
-    } catch {
-      case ise: IllegalStateException => return true
-    }
-    false
-  }
-
   private def isSpace(c: Char): Boolean = {
     " \t\r\n".indexOf(c) != -1
   }
@@ -2222,37 +2101,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Adds a shutdown hook with default priority.
-   *
-   * @param hook The code to run during shutdown.
-   * @return A handle that can be used to unregister the shutdown hook.
-   */
-  def addShutdownHook(hook: () => Unit): AnyRef = {
-    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
-  }
-
-  /**
-   * Adds a shutdown hook with the given priority. Hooks with lower priority 
values run
-   * first.
-   *
-   * @param hook The code to run during shutdown.
-   * @return A handle that can be used to unregister the shutdown hook.
-   */
-  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
-    shutdownHooks.add(priority, hook)
-  }
-
-  /**
-   * Remove a previously installed shutdown hook.
-   *
-   * @param ref A handle returned by `addShutdownHook`.
-   * @return Whether the hook was removed.
-   */
-  def removeShutdownHook(ref: AnyRef): Boolean = {
-    shutdownHooks.remove(ref)
-  }
-
-  /**
    * To avoid calling `Utils.getCallSite` for every single RDD we create in 
the body,
    * set a dummy call site that RDDs use instead. This is for performance 
optimization.
    */
@@ -2299,70 +2147,6 @@ private[spark] object Utils extends Logging {
 
 }
 
-private [util] class SparkShutdownHookManager {
-
-  private val hooks = new PriorityQueue[SparkShutdownHook]()
-  private var shuttingDown = false
-
-  /**
-   * Install a hook to run at shutdown and run all registered hooks in order. 
Hadoop 1.x does not
-   * have `ShutdownHookManager`, so in that case we just use the JVM's 
`Runtime` object and hope for
-   * the best.
-   */
-  def install(): Unit = {
-    val hookTask = new Runnable() {
-      override def run(): Unit = runAll()
-    }
-    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) 
match {
-      case Success(shmClass) =>
-        val fsPriority = 
classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
-          .asInstanceOf[Int]
-        val shm = shmClass.getMethod("get").invoke(null)
-        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], 
classOf[Int])
-          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
-
-      case Failure(_) =>
-        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark 
Shutdown Hook"));
-    }
-  }
-
-  def runAll(): Unit = synchronized {
-    shuttingDown = true
-    while (!hooks.isEmpty()) {
-      Try(Utils.logUncaughtExceptions(hooks.poll().run()))
-    }
-  }
-
-  def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
-    checkState()
-    val hookRef = new SparkShutdownHook(priority, hook)
-    hooks.add(hookRef)
-    hookRef
-  }
-
-  def remove(ref: AnyRef): Boolean = synchronized {
-    hooks.remove(ref)
-  }
-
-  private def checkState(): Unit = {
-    if (shuttingDown) {
-      throw new IllegalStateException("Shutdown hooks cannot be modified 
during shutdown.")
-    }
-  }
-
-}
-
-private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
-  extends Comparable[SparkShutdownHook] {
-
-  override def compareTo(other: SparkShutdownHook): Int = {
-    other.priority - priority
-  }
-
-  def run(): Unit = hook()
-
-}
-
 /**
  * A utility class to redirect the child process's stdout or stderr.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 9c04734..2c9fa59 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.{Logging, SparkContext}
 
 
@@ -76,7 +76,7 @@ object HiveThriftServer2 extends Logging {
     logInfo("Starting SparkContext")
     SparkSQLEnv.init()
 
-    Utils.addShutdownHook { () =>
+    ShutdownHookManager.addShutdownHook { () =>
       SparkSQLEnv.stop()
       uiTab.foreach(_.detach())
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index d388614..7799704 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -39,7 +39,7 @@ import org.apache.thrift.transport.TSocket
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * This code doesn't support remote connections in Hive 1.2+, as the 
underlying CliDriver
@@ -114,7 +114,7 @@ private[hive] object SparkSQLCLIDriver extends Logging {
     SessionState.start(sessionState)
 
     // Clean up after we exit
-    Utils.addShutdownHook { () => SparkSQLEnv.stop() }
+    ShutdownHookManager.addShutdownHook { () => SparkSQLEnv.stop() }
 
     val remoteMode = isRemoteMode(sessionState)
     // "-h" option has been passed, so connect to Hive thrift server.

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 296cc5c..4eae699 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.CacheTableCommand
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.{SparkConf, SparkContext}
 
 /* Implicit conversions */
@@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
   val hiveFilesTemp = File.createTempFile("catalystHiveFiles", "")
   hiveFilesTemp.delete()
   hiveFilesTemp.mkdir()
-  Utils.registerShutdownDeleteDir(hiveFilesTemp)
+  ShutdownHookManager.registerShutdownDeleteDir(hiveFilesTemp)
 
   val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + 
File.separator + "hive")) {
     new File("src" + File.separator + "test" + File.separator + "resources" + 
File.separator)

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 177e710..b496d1f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -44,7 +44,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, 
ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods 
used to create
@@ -604,7 +604,7 @@ class StreamingContext private[streaming] (
           }
           StreamingContext.setActiveContext(this)
         }
-        shutdownHookRef = Utils.addShutdownHook(
+        shutdownHookRef = ShutdownHookManager.addShutdownHook(
           StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
         // Registering Streaming Metrics at the start of the StreamingContext
         assert(env.metricsSystem != null)
@@ -691,7 +691,7 @@ class StreamingContext private[streaming] (
           StreamingContext.setActiveContext(null)
           waiter.notifyStop()
           if (shutdownHookRef != null) {
-            Utils.removeShutdownHook(shutdownHookRef)
+            ShutdownHookManager.removeShutdownHook(shutdownHookRef)
           }
           logInfo("StreamingContext stopped successfully")
       }
@@ -725,7 +725,7 @@ object StreamingContext extends Logging {
    */
   private val ACTIVATION_LOCK = new Object()
 
-  private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 
1
+  private val SHUTDOWN_HOOK_PRIORITY = 
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
 
   private val activeContext = new AtomicReference[StreamingContext](null)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/74c9dcec/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index e19940d..6a8ddb3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -112,7 +112,8 @@ private[spark] class ApplicationMaster(
       val fs = FileSystem.get(yarnConf)
 
       // This shutdown hook should run *after* the SparkContext is shut down.
-      Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1) { () =>
+      val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
+      ShutdownHookManager.addShutdownHook(priority) { () =>
         val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
         val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
 
@@ -199,7 +200,7 @@ private[spark] class ApplicationMaster(
   final def finish(status: FinalApplicationStatus, code: Int, msg: String = 
null): Unit = {
     synchronized {
       if (!finished) {
-        val inShutdown = Utils.inShutdown()
+        val inShutdown = ShutdownHookManager.inShutdown()
         logInfo(s"Final app status: $status, exitCode: $code" +
           Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
         exitCode = code


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to