Repository: spark
Updated Branches:
  refs/heads/branch-1.4 89c8aea94 -> 8ce86b23f


[SPARK-9826] [CORE] Fix cannot use custom classes in log4j.properties

Refactor Utils class and create ShutdownHookManager.

NOTE: Wasn't able to run /dev/run-tests on windows machine.
Manual tests were conducted locally using custom log4j.properties file with 
Redis appender and logstash formatter (bundled in the fat-jar submitted to 
spark)

ex:
log4j.rootCategory=WARN,console,redis
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: 
%m%n

log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.spark.graphx.Pregel=INFO

log4j.appender.redis=com.ryantenney.log4j.FailoverRedisAppender
log4j.appender.redis.endpoints=hostname:port
log4j.appender.redis.key=mykey
log4j.appender.redis.alwaysBatch=false
log4j.appender.redis.layout=net.logstash.log4j.JSONEventLayoutV1

Author: michellemay <mle...@gmail.com>

Closes #8109 from michellemay/SPARK-9826.

(cherry picked from commit ab7e721cfec63155641e81e72b4ad43cf6a7d4c7)

Conflicts:
        core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
        core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
        core/src/main/scala/org/apache/spark/util/Utils.scala
        
sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
        
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
        
sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
        sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ce86b23
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ce86b23
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ce86b23

Branch: refs/heads/branch-1.4
Commit: 8ce86b23f41fe6d1db4c64ec92fb8ddb438bb655
Parents: 89c8aea
Author: Michel Lemay <mle...@gmail.com>
Authored: Wed Aug 12 16:17:58 2015 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Aug 12 16:56:52 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |   5 +-
 .../spark/deploy/history/HistoryServer.scala    |   4 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |   7 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   4 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   4 +-
 .../apache/spark/storage/DiskBlockManager.scala |  10 +-
 .../spark/storage/TachyonBlockManager.scala     |   6 +-
 .../apache/spark/util/ShutdownHookManager.scala | 266 +++++++++++++++++++
 .../util/SparkUncaughtExceptionHandler.scala    |   2 +-
 .../scala/org/apache/spark/util/Utils.scala     | 222 +---------------
 .../spark/sql/sources/SqlNewHadoopRDD.scala     |   4 +-
 .../hive/thriftserver/HiveThriftServer2.scala   |   4 +-
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |   4 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |   4 +-
 .../spark/streaming/StreamingContext.scala      |   8 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |   5 +-
 16 files changed, 307 insertions(+), 252 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3caae87..25eae38 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -554,7 +554,8 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
     // Make sure the context is stopped if the user forgets about it. This 
avoids leaving
     // unfinished event logs around after the JVM exits cleanly. It doesn't 
help if the JVM
     // is killed, though.
-    _shutdownHookRef = 
Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
+    _shutdownHookRef = ShutdownHookManager.addShutdownHook(
+      ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
       logInfo("Invoking stop() from shutdown hook")
       stop()
     }
@@ -1627,7 +1628,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       return
     }
     if (_shutdownHookRef != null) {
-      Utils.removeShutdownHook(_shutdownHookRef)
+      ShutdownHookManager.removeShutdownHook(_shutdownHookRef)
     }
 
     Utils.tryLogNonFatalError {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 5a0eb58..250825b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -29,7 +29,7 @@ import org.apache.spark.status.api.v1.{ApiRootResource, 
ApplicationInfo, Applica
   UIRoot}
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.{SignalLogger, Utils}
+import org.apache.spark.util.{ShutdownHookManager, SignalLogger, Utils}
 
 /**
  * A web server that renders SparkUIs of completed applications.
@@ -230,7 +230,7 @@ object HistoryServer extends Logging {
     val server = new HistoryServer(conf, provider, securityManager, port)
     server.bind()
 
-    Utils.addShutdownHook { () => server.stop() }
+    ShutdownHookManager.addShutdownHook { () => server.stop() }
 
     // Wait until the end of the world... or if the HistoryServer process is 
manually stopped
     while(true) { Thread.sleep(Int.MaxValue) }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7aa85b7..f58903e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,7 +28,7 @@ import com.google.common.io.Files
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.util.logging.FileAppender
 
 /**
@@ -70,7 +70,8 @@ private[deploy] class ExecutorRunner(
     }
     workerThread.start()
     // Shutdown hook that kills actors on shutdown.
-    shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker 
shutting down")) }
+    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
+      killProcess(Some("Worker shutting down")) }
   }
 
   /**
@@ -102,7 +103,7 @@ private[deploy] class ExecutorRunner(
       workerThread = null
       state = ExecutorState.KILLED
       try {
-        Utils.removeShutdownHook(shutdownHook)
+        ShutdownHookManager.removeShutdownHook(shutdownHook)
       } catch {
         case e: IllegalStateException => None
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 2cefe63..0198234 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.{NextIterator, Utils}
+import org.apache.spark.util.{NextIterator, ShutdownHookManager, Utils}
 import org.apache.spark.scheduler.{HostTaskLocation, HDFSCacheTaskLocation}
 import org.apache.spark.storage.StorageLevel
 
@@ -274,7 +274,7 @@ class HadoopRDD[K, V](
           }
         } catch {
           case e: Exception => {
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 84456d6..7c4e4fc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -33,7 +33,7 @@ import org.apache.spark._
 import org.apache.spark.executor.DataReadMethod
 import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.StorageLevel
 
@@ -175,7 +175,7 @@ class NewHadoopRDD[K, V](
           }
         } catch {
           case e: Exception => {
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 91ef863..e619123 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -22,7 +22,7 @@ import java.io.{IOException, File}
 
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
  * Creates and maintains the logical mapping between logical blocks and 
physical on-disk
@@ -139,7 +139,7 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   }
 
   private def addShutdownHook(): AnyRef = {
-    Utils.addShutdownHook(Utils.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
+    
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY
 + 1) { () =>
       logInfo("Shutdown hook called")
       DiskBlockManager.this.doStop()
     }
@@ -149,7 +149,7 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
   private[spark] def stop() {
     // Remove the shutdown hook.  It causes memory leaks if we leave it around.
     try {
-      Utils.removeShutdownHook(shutdownHook)
+      ShutdownHookManager.removeShutdownHook(shutdownHook)
     } catch {
       case e: Exception =>
         logError(s"Exception while removing shutdown hook.", e)
@@ -163,7 +163,9 @@ private[spark] class DiskBlockManager(blockManager: 
BlockManager, conf: SparkCon
       localDirs.foreach { localDir =>
         if (localDir.isDirectory() && localDir.exists()) {
           try {
-            if (!Utils.hasRootAsShutdownDeleteDir(localDir)) 
Utils.deleteRecursively(localDir)
+            if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(localDir)) {
+              Utils.deleteRecursively(localDir)
+            }
           } catch {
             case e: Exception =>
               logError(s"Exception while deleting local spark dir: $localDir", 
e)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index d3b8c80..60f8855 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -28,7 +28,7 @@ import tachyon.TachyonURI
 
 import org.apache.spark.{SparkException, SparkConf, Logging}
 import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 
 /**
@@ -73,7 +73,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     // in order to avoid having really large inodes at the top level in 
Tachyon.
     tachyonDirs = createTachyonDirs()
     subDirs = Array.fill(tachyonDirs.length)(new 
Array[TachyonFile](subDirsPerTachyonDir))
-    tachyonDirs.foreach(tachyonDir => 
Utils.registerShutdownDeleteDir(tachyonDir))
+    tachyonDirs.foreach(tachyonDir => 
ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
   }
 
   override def toString: String = {"ExternalBlockStore-Tachyon"}
@@ -202,7 +202,7 @@ private[spark] class TachyonBlockManager() extends 
ExternalBlockManager with Log
     logDebug("Shutdown hook called")
     tachyonDirs.foreach { tachyonDir =>
       try {
-        if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+        if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
           Utils.deleteRecursively(tachyonDir, client)
         }
       } catch {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala 
b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
new file mode 100644
index 0000000..61ff9b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import java.io.File
+import java.util.PriorityQueue
+
+import scala.util.{Failure, Success, Try}
+import tachyon.client.TachyonFile
+
+import org.apache.hadoop.fs.FileSystem
+import org.apache.spark.Logging
+
+/**
+ * Various utility methods used by Spark.
+ */
+private[spark] object ShutdownHookManager extends Logging {
+  val DEFAULT_SHUTDOWN_PRIORITY = 100
+
+  /**
+   * The shutdown priority of the SparkContext instance. This is lower than 
the default
+   * priority, so that by default hooks are run before the context is shut 
down.
+   */
+  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
+
+  /**
+   * The shutdown priority of temp directory must be lower than the 
SparkContext shutdown
+   * priority. Otherwise cleaning the temp directories while Spark jobs are 
running can
+   * throw undesirable errors at the time of shutdown.
+   */
+  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
+
+  private lazy val shutdownHooks = {
+    val manager = new SparkShutdownHookManager()
+    manager.install()
+    manager
+  }
+
+  private val shutdownDeletePaths = new 
scala.collection.mutable.HashSet[String]()
+  private val shutdownDeleteTachyonPaths = new 
scala.collection.mutable.HashSet[String]()
+
+  // Add a shutdown hook to delete the temp dirs when the JVM exits
+  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
+    logInfo("Shutdown hook called")
+    shutdownDeletePaths.foreach { dirPath =>
+      try {
+        logInfo("Deleting directory " + dirPath)
+        Utils.deleteRecursively(new File(dirPath))
+      } catch {
+        case e: Exception => logError(s"Exception while deleting Spark temp 
dir: $dirPath", e)
+      }
+    }
+  }
+
+  // Register the path to be deleted via shutdown hook
+  def registerShutdownDeleteDir(file: File) {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths += absolutePath
+    }
+  }
+
+  // Register the tachyon path to be deleted via shutdown hook
+  def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
+    val absolutePath = tachyonfile.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths += absolutePath
+    }
+  }
+
+  // Remove the path to be deleted via shutdown hook
+  def removeShutdownDeleteDir(file: File) {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.remove(absolutePath)
+    }
+  }
+
+  // Remove the tachyon path to be deleted via shutdown hook
+  def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
+    val absolutePath = tachyonfile.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.remove(absolutePath)
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  def hasShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.contains(absolutePath)
+    }
+  }
+
+  // Is the path already registered to be deleted via a shutdown hook ?
+  def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
+    val absolutePath = file.getPath()
+    shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.contains(absolutePath)
+    }
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, 
then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
+  // paths - resulting in IOException and incomplete cleanup.
+  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
+    val absolutePath = file.getAbsolutePath()
+    val retval = shutdownDeletePaths.synchronized {
+      shutdownDeletePaths.exists { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
+    }
+    retval
+  }
+
+  // Note: if file is child of some registered path, while not equal to it, 
then return true;
+  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
+  // paths - resulting in Exception and incomplete cleanup.
+  def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
+    val absolutePath = file.getPath()
+    val retval = shutdownDeleteTachyonPaths.synchronized {
+      shutdownDeleteTachyonPaths.exists { path =>
+        !absolutePath.equals(path) && absolutePath.startsWith(path)
+      }
+    }
+    if (retval) {
+      logInfo("path = " + file + ", already present as root for deletion.")
+    }
+    retval
+  }
+
+  /**
+   * Detect whether this thread might be executing a shutdown hook. Will 
always return true if
+   * the current thread is a running a shutdown hook but may spuriously return 
true otherwise (e.g.
+   * if System.exit was just called by a concurrent thread).
+   *
+   * Currently, this detects whether the JVM is shutting down by 
Runtime#addShutdownHook throwing
+   * an IllegalStateException.
+   */
+  def inShutdown(): Boolean = {
+    try {
+      val hook = new Thread {
+        override def run() {}
+      }
+      Runtime.getRuntime.addShutdownHook(hook)
+      Runtime.getRuntime.removeShutdownHook(hook)
+    } catch {
+      case ise: IllegalStateException => return true
+    }
+    false
+  }
+
+  /**
+   * Adds a shutdown hook with default priority.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(hook: () => Unit): AnyRef = {
+    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
+  }
+
+  /**
+   * Adds a shutdown hook with the given priority. Hooks with lower priority 
values run
+   * first.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
+    shutdownHooks.add(priority, hook)
+  }
+
+  /**
+   * Remove a previously installed shutdown hook.
+   *
+   * @param ref A handle returned by `addShutdownHook`.
+   * @return Whether the hook was removed.
+   */
+  def removeShutdownHook(ref: AnyRef): Boolean = {
+    shutdownHooks.remove(ref)
+  }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+  private val hooks = new PriorityQueue[SparkShutdownHook]()
+  private var shuttingDown = false
+
+  /**
+   * Install a hook to run at shutdown and run all registered hooks in order. 
Hadoop 1.x does not
+   * have `ShutdownHookManager`, so in that case we just use the JVM's 
`Runtime` object and hope for
+   * the best.
+   */
+  def install(): Unit = {
+    val hookTask = new Runnable() {
+      override def run(): Unit = runAll()
+    }
+    Try(Utils.classForName("org.apache.hadoop.util.ShutdownHookManager")) 
match {
+      case Success(shmClass) =>
+        val fsPriority = 
classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+          .asInstanceOf[Int]
+        val shm = shmClass.getMethod("get").invoke(null)
+        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], 
classOf[Int])
+          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+      case Failure(_) =>
+        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark 
Shutdown Hook"));
+    }
+  }
+
+  def runAll(): Unit = synchronized {
+    shuttingDown = true
+    while (!hooks.isEmpty()) {
+      Try(Utils.logUncaughtExceptions(hooks.poll().run()))
+    }
+  }
+
+  def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+    checkState()
+    val hookRef = new SparkShutdownHook(priority, hook)
+    hooks.add(hookRef)
+    hookRef
+  }
+
+  def remove(ref: AnyRef): Boolean = synchronized {
+    hooks.remove(ref)
+  }
+
+  private def checkState(): Unit = {
+    if (shuttingDown) {
+      throw new IllegalStateException("Shutdown hooks cannot be modified 
during shutdown.")
+    }
+  }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+  extends Comparable[SparkShutdownHook] {
+
+  override def compareTo(other: SparkShutdownHook): Int = {
+    other.priority - priority
+  }
+
+  def run(): Unit = hook()
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
index ad3db1f..7248187 100644
--- 
a/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/SparkUncaughtExceptionHandler.scala
@@ -33,7 +33,7 @@ private[spark] object SparkUncaughtExceptionHandler
 
       // We may have been called from a shutdown hook. If so, we must not call 
System.exit().
       // (If we do, we will deadlock.)
-      if (!Utils.inShutdown()) {
+      if (!ShutdownHookManager.inShutdown()) {
         if (exception.isInstanceOf[OutOfMemoryError]) {
           System.exit(SparkExitCode.OOM)
         } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1e1872e..615944e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
-import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
+import java.util.{Properties, Locale, Random, UUID}
 import java.util.concurrent._
 import javax.net.ssl.HttpsURLConnection
 
@@ -65,28 +65,10 @@ private[spark] object CallSite {
 private[spark] object Utils extends Logging {
   val random = new Random()
 
-  val DEFAULT_SHUTDOWN_PRIORITY = 100
-
-  /**
-   * The shutdown priority of the SparkContext instance. This is lower than 
the default
-   * priority, so that by default hooks are run before the context is shut 
down.
-   */
-  val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
-
-  /**
-   * The shutdown priority of temp directory must be lower than the 
SparkContext shutdown
-   * priority. Otherwise cleaning the temp directories while Spark jobs are 
running can
-   * throw undesirable errors at the time of shutdown.
-   */
-  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
-
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
 
-  private val shutdownHooks = new SparkShutdownHookManager()
-  shutdownHooks.install()
-
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -192,86 +174,6 @@ private[spark] object Utils extends Logging {
     }
   }
 
-  private val shutdownDeletePaths = new 
scala.collection.mutable.HashSet[String]()
-  private val shutdownDeleteTachyonPaths = new 
scala.collection.mutable.HashSet[String]()
-
-  // Add a shutdown hook to delete the temp dirs when the JVM exits
-  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
-    logInfo("Shutdown hook called")
-    shutdownDeletePaths.foreach { dirPath =>
-      try {
-        logInfo("Deleting directory " + dirPath)
-        Utils.deleteRecursively(new File(dirPath))
-      } catch {
-        case e: Exception => logError(s"Exception while deleting Spark temp 
dir: $dirPath", e)
-      }
-    }
-  }
-
-  // Register the path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(file: File) {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths += absolutePath
-    }
-  }
-
-  // Register the tachyon path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
-    val absolutePath = tachyonfile.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths += absolutePath
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.contains(absolutePath)
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.contains(absolutePath)
-    }
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, 
then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
-  // paths - resulting in IOException and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    val retval = shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.exists { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, 
then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to 
delete each others
-  // paths - resulting in Exception and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
-    val absolutePath = file.getPath()
-    val retval = shutdownDeleteTachyonPaths.synchronized {
-      shutdownDeleteTachyonPaths.exists { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
   /**
    * JDK equivalent of `chmod 700 file`.
    *
@@ -320,7 +222,7 @@ private[spark] object Utils extends Logging {
       root: String = System.getProperty("java.io.tmpdir"),
       namePrefix: String = "spark"): File = {
     val dir = createDirectory(root, namePrefix)
-    registerShutdownDeleteDir(dir)
+    ShutdownHookManager.registerShutdownDeleteDir(dir)
     dir
   }
 
@@ -953,9 +855,7 @@ private[spark] object Utils extends Logging {
           if (savedIOException != null) {
             throw savedIOException
           }
-          shutdownDeletePaths.synchronized {
-            shutdownDeletePaths.remove(file.getAbsolutePath)
-          }
+          ShutdownHookManager.removeShutdownDeleteDir(file)
         }
       } finally {
         if (!file.delete()) {
@@ -1459,27 +1359,6 @@ private[spark] object Utils extends Logging {
     serializer.deserialize[T](serializer.serialize(value))
   }
 
-  /**
-   * Detect whether this thread might be executing a shutdown hook. Will 
always return true if
-   * the current thread is a running a shutdown hook but may spuriously return 
true otherwise (e.g.
-   * if System.exit was just called by a concurrent thread).
-   *
-   * Currently, this detects whether the JVM is shutting down by 
Runtime#addShutdownHook throwing
-   * an IllegalStateException.
-   */
-  def inShutdown(): Boolean = {
-    try {
-      val hook = new Thread {
-        override def run() {}
-      }
-      Runtime.getRuntime.addShutdownHook(hook)
-      Runtime.getRuntime.removeShutdownHook(hook)
-    } catch {
-      case ise: IllegalStateException => return true
-    }
-    false
-  }
-
   private def isSpace(c: Char): Boolean = {
     " \t\r\n".indexOf(c) != -1
   }
@@ -2180,37 +2059,6 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Adds a shutdown hook with default priority.
-   *
-   * @param hook The code to run during shutdown.
-   * @return A handle that can be used to unregister the shutdown hook.
-   */
-  def addShutdownHook(hook: () => Unit): AnyRef = {
-    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY)(hook)
-  }
-
-  /**
-   * Adds a shutdown hook with the given priority. Hooks with lower priority 
values run
-   * first.
-   *
-   * @param hook The code to run during shutdown.
-   * @return A handle that can be used to unregister the shutdown hook.
-   */
-  def addShutdownHook(priority: Int)(hook: () => Unit): AnyRef = {
-    shutdownHooks.add(priority, hook)
-  }
-
-  /**
-   * Remove a previously installed shutdown hook.
-   *
-   * @param ref A handle returned by `addShutdownHook`.
-   * @return Whether the hook was removed.
-   */
-  def removeShutdownHook(ref: AnyRef): Boolean = {
-    shutdownHooks.remove(ref)
-  }
-
-  /**
    * To avoid calling `Utils.getCallSite` for every single RDD we create in 
the body,
    * set a dummy call site that RDDs use instead. This is for performance 
optimization.
    */
@@ -2246,70 +2094,6 @@ private[spark] object Utils extends Logging {
 
 }
 
-private [util] class SparkShutdownHookManager {
-
-  private val hooks = new PriorityQueue[SparkShutdownHook]()
-  private var shuttingDown = false
-
-  /**
-   * Install a hook to run at shutdown and run all registered hooks in order. 
Hadoop 1.x does not
-   * have `ShutdownHookManager`, so in that case we just use the JVM's 
`Runtime` object and hope for
-   * the best.
-   */
-  def install(): Unit = {
-    val hookTask = new Runnable() {
-      override def run(): Unit = runAll()
-    }
-    Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
-      case Success(shmClass) =>
-        val fsPriority = 
classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
-          .asInstanceOf[Int]
-        val shm = shmClass.getMethod("get").invoke(null)
-        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], 
classOf[Int])
-          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
-
-      case Failure(_) =>
-        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark 
Shutdown Hook"));
-    }
-  }
-
-  def runAll(): Unit = synchronized {
-    shuttingDown = true
-    while (!hooks.isEmpty()) {
-      Try(Utils.logUncaughtExceptions(hooks.poll().run()))
-    }
-  }
-
-  def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
-    checkState()
-    val hookRef = new SparkShutdownHook(priority, hook)
-    hooks.add(hookRef)
-    hookRef
-  }
-
-  def remove(ref: AnyRef): Boolean = synchronized {
-    hooks.remove(ref)
-  }
-
-  private def checkState(): Unit = {
-    if (shuttingDown) {
-      throw new IllegalStateException("Shutdown hooks cannot be modified 
during shutdown.")
-    }
-  }
-
-}
-
-private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
-  extends Comparable[SparkShutdownHook] {
-
-  override def compareTo(other: SparkShutdownHook): Int = {
-    other.priority - priority
-  }
-
-  def run(): Unit = hook()
-
-}
-
 /**
  * A utility class to redirect the child process's stdout or stderr.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
index ebad0c1..d5aeb46 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/SqlNewHadoopRDD.scala
@@ -34,7 +34,7 @@ import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark.rdd.{RDD, HadoopRDD}
 import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 import scala.reflect.ClassTag
 
@@ -195,7 +195,7 @@ private[sql] class SqlNewHadoopRDD[K, V](
           }
         } catch {
           case e: Exception => {
-            if (!Utils.inShutdown()) {
+            if (!ShutdownHookManager.inShutdown()) {
               logWarning("Exception in RecordReader.close()", e)
             }
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index 94687ee..d03f9f7 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.sql.hive.thriftserver.ui.ThriftServerTab
 import org.apache.spark.sql.hive.{HiveContext, HiveShim}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.{Logging, SparkContext}
 
 /**
@@ -72,7 +72,7 @@ object HiveThriftServer2 extends Logging {
     logInfo("Starting SparkContext")
     SparkSQLEnv.init()
 
-    Utils.addShutdownHook { () =>
+    ShutdownHookManager.addShutdownHook { () =>
       SparkSQLEnv.stop()
       uiTab.foreach(_.detach())
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 14f6f65..c56d807 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -38,7 +38,7 @@ import org.apache.thrift.transport.TSocket
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.hive.{HiveContext, HiveShim}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 private[hive] object SparkSQLCLIDriver {
   private var prompt = "spark-sql"
@@ -109,7 +109,7 @@ private[hive] object SparkSQLCLIDriver {
     SessionState.start(sessionState)
 
     // Clean up after we exit
-    Utils.addShutdownHook { () => SparkSQLEnv.stop() }
+    ShutdownHookManager.addShutdownHook { () => SparkSQLEnv.stop() }
 
     // "-h" option has been passed, so connect to Hive thrift server.
     if (sessionState.getHost != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 1b6c32a..0bd6907 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -36,7 +36,7 @@ import org.apache.spark.sql.execution.CacheTableCommand
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.execution.HiveNativeCommand
 import org.apache.spark.sql.SQLConf
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ShutdownHookManager, Utils}
 import org.apache.spark.{SparkConf, SparkContext}
 
 import scala.collection.mutable
@@ -146,7 +146,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
   val hiveFilesTemp = File.createTempFile("catalystHiveFiles", "")
   hiveFilesTemp.delete()
   hiveFilesTemp.mkdir()
-  Utils.registerShutdownDeleteDir(hiveFilesTemp)
+  ShutdownHookManager.registerShutdownDeleteDir(hiveFilesTemp)
 
   val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + 
File.separator + "hive")) {
     new File("src" + File.separator + "test" + File.separator + "resources" + 
File.separator)

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 6b78a82..938f0b2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -43,7 +43,7 @@ import org.apache.spark.streaming.dstream._
 import org.apache.spark.streaming.receiver.{ActorReceiver, 
ActorSupervisorStrategy, Receiver}
 import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
 import org.apache.spark.streaming.ui.{StreamingJobProgressListener, 
StreamingTab}
-import org.apache.spark.util.{CallSite, Utils}
+import org.apache.spark.util.{CallSite, ShutdownHookManager, Utils}
 
 /**
  * Main entry point for Spark Streaming functionality. It provides methods 
used to create
@@ -601,7 +601,7 @@ class StreamingContext private[streaming] (
           }
           StreamingContext.setActiveContext(this)
         }
-        shutdownHookRef = Utils.addShutdownHook(
+        shutdownHookRef = ShutdownHookManager.addShutdownHook(
           StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
         // Registering Streaming Metrics at the start of the StreamingContext
         assert(env.metricsSystem != null)
@@ -688,7 +688,7 @@ class StreamingContext private[streaming] (
           StreamingContext.setActiveContext(null)
           waiter.notifyStop()
           if (shutdownHookRef != null) {
-            Utils.removeShutdownHook(shutdownHookRef)
+            ShutdownHookManager.removeShutdownHook(shutdownHookRef)
           }
           logInfo("StreamingContext stopped successfully")
       }
@@ -722,7 +722,7 @@ object StreamingContext extends Logging {
    */
   private val ACTIVATION_LOCK = new Object()
 
-  private val SHUTDOWN_HOOK_PRIORITY = Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 
1
+  private val SHUTDOWN_HOOK_PRIORITY = 
ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY + 1
 
   private val activeContext = new AtomicReference[StreamingContext](null)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8ce86b23/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2975296..04efe57 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -102,7 +102,8 @@ private[spark] class ApplicationMaster(
       val fs = FileSystem.get(yarnConf)
 
       // This shutdown hook should run *after* the SparkContext is shut down.
-      Utils.addShutdownHook(Utils.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1) { () =>
+      val priority = ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY - 1
+      ShutdownHookManager.addShutdownHook(priority) { () =>
         val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
         val isLastAttempt = client.getAttemptId().getAttemptId() >= 
maxAppAttempts
 
@@ -189,7 +190,7 @@ private[spark] class ApplicationMaster(
   final def finish(status: FinalApplicationStatus, code: Int, msg: String = 
null): Unit = {
     synchronized {
       if (!finished) {
-        val inShutdown = Utils.inShutdown()
+        val inShutdown = ShutdownHookManager.inShutdown()
         logInfo(s"Final app status: $status, exitCode: $code" +
           Option(msg).map(msg => s", (reason: $msg)").getOrElse(""))
         exitCode = code


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to