    --- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -142,6 +145,117 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
         val containerIdString = 
    +  /**
    +   * Obtains token for the Hive metastore, using the current user as the 
    +   * Some exceptions are caught and downgraded to a log message.
    +   * @param conf hadoop configuration; the Hive configuration will be 
based on this
    +   * @return a token, or `None` if there's no need for a token (no 
metastore URI or principal
    +   *         in the config), or if a binding exception was caught and 
    +   */
    +  def obtainTokenForHiveMetastore(conf: Configuration): 
Option[Token[DelegationTokenIdentifier]] = {
    +    try {
    +      obtainTokenForHiveMetastoreInner(conf, 
    +    } catch {
    +      case e: Exception => {
    +        handleTokenIntrospectionFailure("Hive", e)
    +        None
    +      }
    +    }
    +  }
    +  /**
    +   * Handle failures to obtain a token through introspection. Failures to 
load the class are
    +   * not treated as errors: anything else is.
    +   * @param service service name for error messages
    +   * @param thrown exception caught
    +   * @throws Exception if the `thrown` exception isn't one that is to be 
    +   */
    +  private[yarn] def handleTokenIntrospectionFailure(service: String, 
thrown: Throwable): Unit = {
    +    thrown match {
    +      case e: ClassNotFoundException =>
    +        logInfo(s"$service class not found $e")
    +        logDebug("Hive Class not found", e)
    +      case e: NoClassDefFoundError =>
    +        logDebug(s"$service class not found", e)
    +      case e: InvocationTargetException =>
    +        // problem talking to the metastore or other hive-side exception
    +        logInfo(s"$service method invocation failed", e)
    +        throw if (e.getCause != null) e.getCause else e
    +      case e: ReflectiveOperationException =>
    +        // any other reflection failure log at error and rethrow
    +        logError(s"$service Class operation failed", e)
    +        throw e;
    +      case e: RuntimeException =>
    +        // any runtime exception, including Illegal Argument Exception
    +        throw e
    +      case t: Throwable => {
    +        val msg = s"$service: Unexpected Exception " + t
    +        logError(msg, t)
    +        throw new RuntimeException(msg, t)
    +      }
    +    }
    +  }
    +  /**
    +   * Inner routine to obtains token for the Hive metastore; exceptions are 
raised on any problem.
    +   * @param conf hadoop configuration; the Hive configuration will be 
based on this.
    +   * @param username the username of the principal requesting the 
delegating token.
    +   * @return a delegation token
    +   */
    +  private[yarn] def obtainTokenForHiveMetastoreInner(conf: Configuration,
    +      username: String): Option[Token[DelegationTokenIdentifier]] = {
    +    val mirror = universe.runtimeMirror(getClass.getClassLoader)
    +    // the hive configuration class is a subclass of Hadoop Configuration, 
so can be cast down
    +    // to a Configuration and used without reflection
    +    val hiveConfClass = 
    +    // using the (Configuration, Class) constructor allows the current 
configuratin to be included
    +    // in the hive config.
    +    val ctor = hiveConfClass.getDeclaredConstructor(classOf[Configuration],
    +      classOf[Object].getClass)
    +    val hiveConf = ctor.newInstance(conf, 
    +    val metastore_uri = hiveConf.getTrimmed("hive.metastore.uris", "")
    +    // Check for local metastore
    +    if (metastore_uri.nonEmpty) {
    +      if (username.isEmpty) {
    --- End diff --

