Repository: spark
Updated Branches:
  refs/heads/master adbdb19a7 -> e0628f2fa


Revert "[SPARK-5342] [YARN] Allow long running Spark apps to run on secure 
YARN/HDFS"

This reverts commit 6c65da6bb7d1213e6a4a9f7fd1597d029d87d07c.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e0628f2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e0628f2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e0628f2f

Branch: refs/heads/master
Commit: e0628f2fae7f99d096f9dd625876a60d11020d9b
Parents: adbdb19
Author: Patrick Wendell <patr...@databricks.com>
Authored: Thu Apr 30 14:59:20 2015 -0700
Committer: Patrick Wendell <patr...@databricks.com>
Committed: Thu Apr 30 14:59:20 2015 -0700

----------------------------------------------------------------------
 .../deploy/ExecutorDelegationTokenUpdater.scala | 105 ----------
 .../apache/spark/deploy/SparkHadoopUtil.scala   |  69 +------
 .../org/apache/spark/deploy/SparkSubmit.scala   |   4 -
 .../spark/deploy/SparkSubmitArguments.scala     |  15 --
 .../executor/CoarseGrainedExecutorBackend.scala |  13 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   4 +-
 docs/security.md                                |   2 -
 .../spark/launcher/SparkSubmitOptionParser.java |   6 +-
 .../deploy/yarn/AMDelegationTokenRenewer.scala  | 203 -------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   |  17 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 123 +++++------
 .../spark/deploy/yarn/ClientArguments.scala     |  10 -
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |  60 +-----
 .../apache/spark/deploy/yarn/ClientSuite.scala  |  51 +++++
 .../spark/deploy/yarn/YarnClusterSuite.scala    |   2 -
 .../deploy/yarn/YarnSparkHadoopUtilSuite.scala  |  62 +-----
 16 files changed, 111 insertions(+), 635 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
 
b/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
deleted file mode 100644
index 80363aa..0000000
--- 
a/core/src/main/scala/org/apache/spark/deploy/ExecutorDelegationTokenUpdater.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy
-
-import java.util.concurrent.{Executors, TimeUnit}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.{Credentials, UserGroupInformation}
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.{ThreadUtils, Utils}
-
-import scala.util.control.NonFatal
-
-private[spark] class ExecutorDelegationTokenUpdater(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration) extends Logging {
-
-  @volatile private var lastCredentialsFileSuffix = 0
-
-  private val credentialsFile = sparkConf.get("spark.yarn.credentials.file")
-
-  private val delegationTokenRenewer =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
-
-  // On the executor, this thread wakes up and picks up new tokens from HDFS, 
if any.
-  private val executorUpdaterRunnable =
-    new Runnable {
-      override def run(): Unit = 
Utils.logUncaughtExceptions(updateCredentialsIfRequired())
-    }
-
-  def updateCredentialsIfRequired(): Unit = {
-    try {
-      val credentialsFilePath = new Path(credentialsFile)
-      val remoteFs = FileSystem.get(hadoopConf)
-      SparkHadoopUtil.get.listFilesSorted(
-        remoteFs, credentialsFilePath.getParent,
-        credentialsFilePath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.foreach { credentialsStatus =>
-        val suffix = 
SparkHadoopUtil.get.getSuffixForCredentialsPath(credentialsStatus.getPath)
-        if (suffix > lastCredentialsFileSuffix) {
-          logInfo("Reading new delegation tokens from " + 
credentialsStatus.getPath)
-          val newCredentials = getCredentialsFromHDFSFile(remoteFs, 
credentialsStatus.getPath)
-          lastCredentialsFileSuffix = suffix
-          UserGroupInformation.getCurrentUser.addCredentials(newCredentials)
-          logInfo("Tokens updated from credentials file.")
-        } else {
-          // Check every hour to see if new credentials arrived.
-          logInfo("Updated delegation tokens were expected, but the driver has 
not updated the " +
-            "tokens yet, will check again in an hour.")
-          delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, 
TimeUnit.HOURS)
-          return
-        }
-      }
-      val timeFromNowToRenewal =
-        SparkHadoopUtil.get.getTimeFromNowToRenewal(
-          sparkConf, 0.8, UserGroupInformation.getCurrentUser.getCredentials)
-      if (timeFromNowToRenewal <= 0) {
-        executorUpdaterRunnable.run()
-      } else {
-        logInfo(s"Scheduling token refresh from HDFS in $timeFromNowToRenewal 
millis.")
-        delegationTokenRenewer.schedule(
-          executorUpdaterRunnable, timeFromNowToRenewal, TimeUnit.MILLISECONDS)
-      }
-    } catch {
-      // Since the file may get deleted while we are reading it, catch the 
Exception and come
-      // back in an hour to try again
-      case NonFatal(e) =>
-        logWarning("Error while trying to update credentials, will try again 
in 1 hour", e)
-        delegationTokenRenewer.schedule(executorUpdaterRunnable, 1, 
TimeUnit.HOURS)
-    }
-  }
-
-  private def getCredentialsFromHDFSFile(remoteFs: FileSystem, tokenPath: 
Path): Credentials = {
-    val stream = remoteFs.open(tokenPath)
-    try {
-      val newCredentials = new Credentials()
-      newCredentials.readTokenStorageStream(stream)
-      newCredentials
-    } finally {
-      stream.close()
-    }
-  }
-
-  def stop(): Unit = {
-    delegationTokenRenewer.shutdown()
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index 00194fb..cfaebf9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.deploy
 
-import java.io.{ByteArrayInputStream, DataInputStream}
 import java.lang.reflect.Method
 import java.security.PrivilegedExceptionAction
-import java.util.{Arrays, Comparator}
 
-import com.google.common.primitives.Longs
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
+import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
 import org.apache.hadoop.fs.FileSystem.Statistics
-import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.JobContext
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
@@ -36,7 +32,6 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.Utils
 
 import scala.collection.JavaConversions._
-import scala.concurrent.duration._
 
 /**
  * :: DeveloperApi ::
@@ -44,8 +39,7 @@ import scala.concurrent.duration._
  */
 @DeveloperApi
 class SparkHadoopUtil extends Logging {
-  private val sparkConf = new SparkConf()
-  val conf: Configuration = newConfiguration(sparkConf)
+  val conf: Configuration = newConfiguration(new SparkConf())
   UserGroupInformation.setConfiguration(conf)
 
   /**
@@ -207,61 +201,6 @@ class SparkHadoopUtil extends Logging {
     if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
   }
 
-  /**
-   * Lists all the files in a directory with the specified prefix, and does 
not end with the
-   * given suffix. The returned {{FileStatus}} instances are sorted by the 
modification times of
-   * the respective files.
-   */
-  def listFilesSorted(
-      remoteFs: FileSystem,
-      dir: Path,
-      prefix: String,
-      exclusionSuffix: String): Array[FileStatus] = {
-    val fileStatuses = remoteFs.listStatus(dir,
-      new PathFilter {
-        override def accept(path: Path): Boolean = {
-          val name = path.getName
-          name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
-        }
-      })
-    Arrays.sort(fileStatuses, new Comparator[FileStatus] {
-      override def compare(o1: FileStatus, o2: FileStatus): Int = {
-        Longs.compare(o1.getModificationTime, o2.getModificationTime)
-      }
-    })
-    fileStatuses
-  }
-
-  /**
-   * How much time is remaining (in millis) from now to (fraction * renewal 
time for the token that
-   * is valid the latest)?
-   * This will return -ve (or 0) value if the fraction of validity has already 
expired.
-   */
-  def getTimeFromNowToRenewal(
-      sparkConf: SparkConf,
-      fraction: Double,
-      credentials: Credentials): Long = {
-    val now = System.currentTimeMillis()
-
-    val renewalInterval =
-      sparkConf.getLong("spark.yarn.token.renewal.interval", (24 
hours).toMillis)
-
-    credentials.getAllTokens.filter(_.getKind == 
DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-      .map { t =>
-      val identifier = new DelegationTokenIdentifier()
-      identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
-      (identifier.getIssueDate + fraction * renewalInterval).toLong - now
-    }.foldLeft(0L)(math.max)
-  }
-
-
-  private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = 
{
-    val fileName = credentialsPath.getName
-    fileName.substring(
-      fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 
1).toInt
-  }
-
-
   private val HADOOP_CONF_PATTERN = 
"(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored
 
   /**
@@ -312,10 +251,6 @@ object SparkHadoopUtil {
     }
   }
 
-  val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"
-
-  val SPARK_YARN_CREDS_COUNTER_DELIM = "-"
-
   def get: SparkHadoopUtil = {
     hadoop
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 4b89ede..0d149e7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -401,10 +401,6 @@ object SparkSubmit {
       OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
       OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
 
-      // Yarn client or cluster
-      OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = 
"--principal"),
-      OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = 
"--keytab"),
-
       // Other options
       OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
         sysProp = "spark.executor.cores"),

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index c0e4c77..c621b8f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -63,8 +63,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], 
env: Map[String, S
   var action: SparkSubmitAction = null
   val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
   var proxyUser: String = null
-  var principal: String = null
-  var keytab: String = null
 
   // Standalone cluster mode only
   var supervise: Boolean = false
@@ -395,12 +393,6 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
       case PROXY_USER =>
         proxyUser = value
 
-      case PRINCIPAL =>
-        principal = value
-
-      case KEYTAB =>
-        keytab = value
-
       case HELP =>
         printUsageAndExit(0)
 
@@ -514,13 +506,6 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
         |  --num-executors NUM         Number of executors to launch (Default: 
2).
         |  --archives ARCHIVES         Comma separated list of archives to be 
extracted into the
         |                              working directory of each executor.
-        |  --principal PRINCIPAL       Principal to be used to login to KDC, 
while running on
-        |                              secure HDFS.
-        |  --keytab KEYTAB             The full path to the file that contains 
the keytab for the
-        |                              principal specified above. This keytab 
will be copied to
-        |                              the node running the Application Master 
via the Secure
-        |                              Distributed Cache, for renewing the 
login tickets and the
-        |                              delegation tokens periodically.
       """.stripMargin
     )
     SparkSubmit.exitFn()

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 9e0f06a..79aed90 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -26,7 +26,7 @@ import scala.util.{Failure, Success}
 import org.apache.spark.rpc._
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.deploy.{ExecutorDelegationTokenUpdater, 
SparkHadoopUtil}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.scheduler.TaskDescription
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -168,16 +168,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
           driverConf.set(key, value)
         }
       }
-      var tokenUpdaterOption: Option[ExecutorDelegationTokenUpdater] = None
-      if (driverConf.contains("spark.yarn.credentials.file")) {
-        logInfo("Will periodically update credentials from: " +
-          driverConf.get("spark.yarn.credentials.file"))
-        // Periodically update the credentials for this user to ensure HDFS 
tokens get updated.
-        tokenUpdaterOption =
-          Some(new ExecutorDelegationTokenUpdater(driverConf, 
SparkHadoopUtil.get.conf))
-        tokenUpdaterOption.get.updateCredentialsIfRequired()
-      }
-
       val env = SparkEnv.createExecutorEnv(
         driverConf, executorId, hostname, port, cores, isLocal = false)
 
@@ -193,7 +183,6 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
         env.rpcEnv.setupEndpoint("WorkerWatcher", new 
WorkerWatcher(env.rpcEnv, url))
       }
       env.rpcEnv.awaitTermination()
-      tokenUpdaterOption.foreach(_.stop())
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index f107148..7352fa1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -68,7 +68,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: 
Seq[(String, String)])
     extends ThreadSafeRpcEndpoint with Logging {
-
     override protected def log = CoarseGrainedSchedulerBackend.this.log
 
     private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -113,7 +112,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
             // Ignoring the task kill since the executor is not registered.
             logWarning(s"Attempted to kill task $taskId for unknown executor 
$executorId.")
         }
-
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
@@ -124,6 +122,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         } else {
           logInfo("Registered executor: " + executorRef + " with ID " + 
executorId)
           context.reply(RegisteredExecutor)
+
           addressToExecutorId(executorRef.address) = executorId
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
@@ -244,7 +243,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         properties += ((key, value))
       }
     }
-
     // TODO (prashant) send conf instead of properties
     driverEndpoint = rpcEnv.setupEndpoint(
       CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, 
properties))

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/docs/security.md
----------------------------------------------------------------------
diff --git a/docs/security.md b/docs/security.md
index d4ffa60..c034ba1 100644
--- a/docs/security.md
+++ b/docs/security.md
@@ -32,8 +32,6 @@ SSL must be configured on each node and configured for each 
component involved i
 ### YARN mode
 The key-store can be prepared on the client side and then distributed and used 
by the executors as the part of the application. It is possible because the 
user is able to deploy files before the application is started in YARN by using 
`spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. 
The responsibility for encryption of transferring these files is on YARN side 
and has nothing to do with Spark.
 
-For long-running apps like Spark Streaming apps to be able to write to HDFS, 
it is possible to pass a principal and keytab to `spark-submit` via the 
`--principal` and `--keytab` parameters respectively. The keytab passed in will 
be copied over to the machine running the Application Master via the Hadoop 
Distributed Cache (securely - if YARN is configured with SSL and HDFS 
encryption is enabled). The Kerberos login will be periodically renewed using 
this principal and keytab and the delegation tokens required for HDFS will be 
generated periodically so the application can continue writing to HDFS.
-
 ### Standalone mode
 The user needs to provide key-stores and configuration options for master and 
workers. They have to be set by attaching appropriate Java system properties in 
`SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just 
in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to 
use the SSL settings inherited from the worker which spawned that executor. It 
can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that 
parameter is set, the settings provided by user on the client side, are not 
used by the executors.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
index 2290000..8526d2e 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitOptionParser.java
@@ -69,10 +69,8 @@ class SparkSubmitOptionParser {
   // YARN-only options.
   protected final String ARCHIVES = "--archives";
   protected final String EXECUTOR_CORES = "--executor-cores";
-  protected final String KEYTAB = "--keytab";
-  protected final String NUM_EXECUTORS = "--num-executors";
-  protected final String PRINCIPAL = "--principal";
   protected final String QUEUE = "--queue";
+  protected final String NUM_EXECUTORS = "--num-executors";
 
   /**
    * This is the canonical list of spark-submit options. Each entry in the 
array contains the
@@ -98,13 +96,11 @@ class SparkSubmitOptionParser {
     { EXECUTOR_MEMORY },
     { FILES },
     { JARS },
-    { KEYTAB },
     { KILL_SUBMISSION },
     { MASTER },
     { NAME },
     { NUM_EXECUTORS },
     { PACKAGES },
-    { PRINCIPAL },
     { PROPERTIES_FILE },
     { PROXY_USER },
     { PY_FILES },

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
deleted file mode 100644
index 9ff0204..0000000
--- 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.deploy.yarn
-
-import java.security.PrivilegedExceptionAction
-import java.util.concurrent.{Executors, TimeUnit}
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.deploy.SparkHadoopUtil
-
-import org.apache.spark.{Logging, SparkConf}
-import org.apache.spark.util.ThreadUtils
-
-/*
- * The following methods are primarily meant to make sure long-running apps 
like Spark
- * Streaming apps can run without interruption while writing to secure HDFS. 
The
- * scheduleLoginFromKeytab method is called on the driver when the
- * CoarseGrainedScheduledBackend starts up. This method wakes up a thread that 
logs into the KDC
- * once 75% of the renewal interval of the original delegation tokens used for 
the container
- * has elapsed. It then creates new delegation tokens and writes them to HDFS 
in a
- * pre-specified location - the prefix of which is specified in the sparkConf 
by
- * spark.yarn.credentials.file (so the file(s) would be named c-1, c-2 etc. - 
each update goes
- * to a new file, with a monotonically increasing suffix). After this, the 
credentials are
- * updated once 75% of the new tokens renewal interval has elapsed.
- *
- * On the executor side, the updateCredentialsIfRequired method is called once 
80% of the
- * validity of the original tokens has elapsed. At that time the executor 
finds the
- * credentials file with the latest timestamp and checks if it has read those 
credentials
- * before (by keeping track of the suffix of the last file it read). If a new 
file has
- * appeared, it will read the credentials and update the currently running UGI 
with it. This
- * process happens again once 80% of the validity of this has expired.
- */
-private[yarn] class AMDelegationTokenRenewer(
-    sparkConf: SparkConf,
-    hadoopConf: Configuration) extends Logging {
-
-  private var lastCredentialsFileSuffix = 0
-
-  private val delegationTokenRenewer =
-    Executors.newSingleThreadScheduledExecutor(
-      ThreadUtils.namedThreadFactory("Delegation Token Refresh Thread"))
-
-  private val hadoopUtil = YarnSparkHadoopUtil.get
-
-  private val daysToKeepFiles = 
sparkConf.getInt("spark.yarn.credentials.file.retention.days", 5)
-  private val numFilesToKeep = 
sparkConf.getInt("spark.yarn.credentials.file.retention.count", 5)
-
-  /**
-   * Schedule a login from the keytab and principal set using the --principal 
and --keytab
-   * arguments to spark-submit. This login happens only when the credentials 
of the current user
-   * are about to expire. This method reads spark.yarn.principal and 
spark.yarn.keytab from
-   * SparkConf to do the login. This method is a no-op in non-YARN mode.
-   *
-   */
-  private[spark] def scheduleLoginFromKeytab(): Unit = {
-    val principal = sparkConf.get("spark.yarn.principal")
-    val keytab = sparkConf.get("spark.yarn.keytab")
-
-    /**
-     * Schedule re-login and creation of new tokens. If tokens have already 
expired, this method
-     * will synchronously create new ones.
-     */
-    def scheduleRenewal(runnable: Runnable): Unit = {
-      val credentials = UserGroupInformation.getCurrentUser.getCredentials
-      val renewalInterval = hadoopUtil.getTimeFromNowToRenewal(sparkConf, 
0.75, credentials)
-      // Run now!
-      if (renewalInterval <= 0) {
-        logInfo("HDFS tokens have expired, creating new tokens now.")
-        runnable.run()
-      } else {
-        logInfo(s"Scheduling login from keytab in $renewalInterval millis.")
-        delegationTokenRenewer.schedule(runnable, renewalInterval, 
TimeUnit.MILLISECONDS)
-      }
-    }
-
-    // This thread periodically runs on the driver to update the delegation 
tokens on HDFS.
-    val driverTokenRenewerRunnable =
-      new Runnable {
-        override def run(): Unit = {
-          try {
-            writeNewTokensToHDFS(principal, keytab)
-            cleanupOldFiles()
-          } catch {
-            case e: Exception =>
-              // Log the error and try to write new tokens back in an hour
-              logWarning("Failed to write out new credentials to HDFS, will 
try again in an " +
-                "hour! If this happens too often tasks will fail.", e)
-              delegationTokenRenewer.schedule(this, 1, TimeUnit.HOURS)
-              return
-          }
-          scheduleRenewal(this)
-        }
-      }
-    // Schedule update of credentials. This handles the case of updating the 
tokens right now
-    // as well, since the renenwal interval will be 0, and the thread will get 
scheduled
-    // immediately.
-    scheduleRenewal(driverTokenRenewerRunnable)
-  }
-
-  // Keeps only files that are newer than daysToKeepFiles days, and deletes 
everything else. At
-  // least numFilesToKeep files are kept for safety
-  private def cleanupOldFiles(): Unit = {
-    import scala.concurrent.duration._
-    try {
-      val remoteFs = FileSystem.get(hadoopConf)
-      val credentialsPath = new 
Path(sparkConf.get("spark.yarn.credentials.file"))
-      val thresholdTime = System.currentTimeMillis() - (daysToKeepFiles 
days).toMillis
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .dropRight(numFilesToKeep)
-        .takeWhile(_.getModificationTime < thresholdTime)
-        .foreach(x => remoteFs.delete(x.getPath, true))
-    } catch {
-      // Such errors are not fatal, so don't throw. Make sure they are logged 
though
-      case e: Exception =>
-        logWarning("Error while attempting to cleanup old tokens. If you are 
seeing many such " +
-          "warnings there may be an issue with your HDFS cluster.")
-    }
-  }
-
-  private def writeNewTokensToHDFS(principal: String, keytab: String): Unit = {
-    // Keytab is copied by YARN to the working directory of the AM, so full 
path is
-    // not needed.
-
-    // HACK:
-    // HDFS will not issue new delegation tokens, if the Credentials object
-    // passed in already has tokens for that FS even if the tokens are expired 
(it really only
-    // checks if there are tokens for the service, and not if they are valid). 
So the only real
-    // way to get new tokens is to make sure a different Credentials object is 
used each time to
-    // get new tokens and then the new tokens are copied over the the current 
user's Credentials.
-    // So:
-    // - we login as a different user and get the UGI
-    // - use that UGI to get the tokens (see doAs block below)
-    // - copy the tokens over to the current user's credentials (this will 
overwrite the tokens
-    // in the current user's Credentials object for this FS).
-    // The login to KDC happens each time new tokens are required, but this is 
rare enough to not
-    // have to worry about (like once every day or so). This makes this code 
clearer than having
-    // to login and then relogin every time (the HDFS API may not relogin 
since we don't use this
-    // UGI directly for HDFS communication.
-    logInfo(s"Attempting to login to KDC using principal: $principal")
-    val keytabLoggedInUGI = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
-    logInfo("Successfully logged into KDC.")
-    val tempCreds = keytabLoggedInUGI.getCredentials
-    val credentialsPath = new 
Path(sparkConf.get("spark.yarn.credentials.file"))
-    val dst = credentialsPath.getParent
-    keytabLoggedInUGI.doAs(new PrivilegedExceptionAction[Void] {
-      // Get a copy of the credentials
-      override def run(): Void = {
-        val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
-        hadoopUtil.obtainTokensForNamenodes(nns, hadoopConf, tempCreds)
-        null
-      }
-    })
-    // Add the temp credentials back to the original ones.
-    UserGroupInformation.getCurrentUser.addCredentials(tempCreds)
-    val remoteFs = FileSystem.get(hadoopConf)
-    // If lastCredentialsFileSuffix is 0, then the AM is either started or 
restarted. If the AM
-    // was restarted, then the lastCredentialsFileSuffix might be > 0, so find 
the newest file
-    // and update the lastCredentialsFileSuffix.
-    if (lastCredentialsFileSuffix == 0) {
-      hadoopUtil.listFilesSorted(
-        remoteFs, credentialsPath.getParent,
-        credentialsPath.getName, 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-        .lastOption.foreach { status =>
-        lastCredentialsFileSuffix = 
hadoopUtil.getSuffixForCredentialsPath(status.getPath)
-      }
-    }
-    val nextSuffix = lastCredentialsFileSuffix + 1
-    val tokenPathStr =
-      sparkConf.get("spark.yarn.credentials.file") +
-        SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM + nextSuffix
-    val tokenPath = new Path(tokenPathStr)
-    val tempTokenPath = new Path(tokenPathStr + 
SparkHadoopUtil.SPARK_YARN_CREDS_TEMP_EXTENSION)
-    logInfo("Writing out delegation tokens to " + tempTokenPath.toString)
-    val credentials = UserGroupInformation.getCurrentUser.getCredentials
-    credentials.writeTokenStorageFile(tempTokenPath, hadoopConf)
-    logInfo(s"Delegation Tokens written out successfully. Renaming file to 
$tokenPathStr")
-    remoteFs.rename(tempTokenPath, tokenPath)
-    logInfo("Delegation token file rename complete.")
-    lastCredentialsFileSuffix = nextSuffix
-  }
-
-  def stop(): Unit = {
-    delegationTokenRenewer.shutdown()
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 82f2b7e..70cb57f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -75,8 +75,6 @@ private[spark] class ApplicationMaster(
   // Fields used in cluster mode.
   private val sparkContextRef = new AtomicReference[SparkContext](null)
 
-  private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = 
None
-
   final def run(): Int = {
     try {
       val appAttemptId = client.getAttemptId()
@@ -127,15 +125,6 @@ private[spark] class ApplicationMaster(
       // doAs in order for the credentials to be passed on to the executor 
containers.
       val securityMgr = new SecurityManager(sparkConf)
 
-      // If the credentials file config is present, we must periodically renew 
tokens. So create
-      // a new AMDelegationTokenRenewer
-      if (sparkConf.contains("spark.yarn.credentials.file")) {
-        delegationTokenRenewerOption = Some(new 
AMDelegationTokenRenewer(sparkConf, yarnConf))
-        // If a principal and keytab have been set, use that to create new 
credentials for executors
-        // periodically
-        delegationTokenRenewerOption.foreach(_.scheduleLoginFromKeytab())
-      }
-
       if (isClusterMode) {
         runDriver(securityMgr)
       } else {
@@ -200,7 +189,6 @@ private[spark] class ApplicationMaster(
           logDebug("shutting down user thread")
           userClassThread.interrupt()
         }
-        if (!inShutdown) delegationTokenRenewerOption.foreach(_.stop())
       }
     }
   }
@@ -247,12 +235,12 @@ private[spark] class ApplicationMaster(
       host: String,
       port: String,
       isClusterMode: Boolean): Unit = {
-    val driverEndpoint = rpcEnv.setupEndpointRef(
+    val driverEndpont = rpcEnv.setupEndpointRef(
       SparkEnv.driverActorSystemName,
       RpcAddress(host, port.toInt),
       YarnSchedulerBackend.ENDPOINT_NAME)
     amEndpoint =
-      rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, 
isClusterMode))
+      rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpont, 
isClusterMode))
   }
 
   private def runDriver(securityMgr: SecurityManager): Unit = {
@@ -506,7 +494,6 @@ private[spark] class ApplicationMaster(
 
     override def onStart(): Unit = {
       driver.send(RegisterClusterManager(self))
-
     }
 
     override def receive: PartialFunction[Any, Unit] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index bb126a4..4abcf73 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,11 +17,9 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io.{ByteArrayInputStream, DataInputStream, File, FileOutputStream}
+import java.io.{File, FileOutputStream}
 import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException}
 import java.nio.ByteBuffer
-import java.security.PrivilegedExceptionAction
-import java.util.UUID
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
 import scala.collection.JavaConversions._
@@ -38,6 +36,7 @@ import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifie
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
 import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.Master
 import org.apache.hadoop.mapreduce.MRJobConfig
 import org.apache.hadoop.security.{Credentials, UserGroupInformation}
 import org.apache.hadoop.security.token.{TokenIdentifier, Token}
@@ -51,8 +50,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
 import org.apache.hadoop.yarn.util.Records
 
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, 
SparkException}
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.util.Utils
 
 private[spark] class Client(
@@ -70,13 +69,11 @@ private[spark] class Client(
 
   private val yarnClient = YarnClient.createYarnClient
   private val yarnConf = new YarnConfiguration(hadoopConf)
-  private var credentials: Credentials = null
+  private val credentials = UserGroupInformation.getCurrentUser.getCredentials
   private val amMemoryOverhead = args.amMemoryOverhead // MB
   private val executorMemoryOverhead = args.executorMemoryOverhead // MB
   private val distCacheMgr = new ClientDistributedCacheManager()
   private val isClusterMode = args.isClusterMode
-
-  private var loginFromKeytab = false
   private val fireAndForget = isClusterMode &&
     !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
 
@@ -91,8 +88,6 @@ private[spark] class Client(
    * available in the alpha API.
    */
   def submitApplication(): ApplicationId = {
-    // Setup the credentials before doing anything else, so we have don't have 
issues at any point.
-    setupCredentials()
     yarnClient.init(yarnConf)
     yarnClient.start()
 
@@ -224,12 +219,12 @@ private[spark] class Client(
     // and add them as local resources to the application master.
     val fs = FileSystem.get(hadoopConf)
     val dst = new Path(fs.getHomeDirectory(), appStagingDir)
-    val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
-    YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, 
credentials)
+    val nns = getNameNodesToAccess(sparkConf) + dst
     // Used to keep track of URIs added to the distributed cache. If the same 
URI is added
     // multiple times, YARN will fail to launch containers for the app with an 
internal
     // error.
     val distributedUris = new HashSet[String]
+    obtainTokensForNamenodes(nns, hadoopConf, credentials)
     obtainTokenForHiveMetastore(hadoopConf, credentials)
     obtainTokenForHBase(hadoopConf, credentials)
 
@@ -248,20 +243,6 @@ private[spark] class Client(
           "for alternatives.")
     }
 
-    // If we passed in a keytab, make sure we copy the keytab to the staging 
directory on
-    // HDFS, and setup the relevant environment vars, so the AM can login 
again.
-    if (loginFromKeytab) {
-      logInfo("To enable the AM to login from keytab, credentials are being 
copied over to the AM" +
-        " via the YARN Secure Distributed Cache.")
-      val localUri = new URI(args.keytab)
-      val localPath = getQualifiedLocalPath(localUri, hadoopConf)
-      val destinationPath = copyFileToRemote(dst, localPath, replication)
-      val destFs = FileSystem.get(destinationPath.toUri(), hadoopConf)
-      distCacheMgr.addResource(
-        destFs, hadoopConf, destinationPath, localResources, 
LocalResourceType.FILE,
-        sparkConf.get("spark.yarn.keytab"), statCache, appMasterOnly = true)
-    }
-
     def addDistributedUri(uri: URI): Boolean = {
       val uriStr = uri.toString()
       if (distributedUris.contains(uriStr)) {
@@ -405,28 +386,6 @@ private[spark] class Client(
   }
 
   /**
-   * Get the renewal interval for tokens.
-   */
-  private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
-    // We cannot use the tokens generated above since those have renewer yarn. 
Trying to renew
-    // those will fail with an access control issue. So create new tokens with 
the logged in
-    // user as renewer.
-    val creds = new Credentials()
-    val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + 
stagingDirPath
-    YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
-      nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
-    val t = creds.getAllTokens
-      .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
-      .head
-    val newExpiration = t.renew(hadoopConf)
-    val identifier = new DelegationTokenIdentifier()
-    identifier.readFields(new DataInputStream(new 
ByteArrayInputStream(t.getIdentifier)))
-    val interval = newExpiration - identifier.getIssueDate
-    logInfo(s"Renewal Interval set to $interval")
-    interval
-  }
-
-  /**
    * Set up the environment for launching our ApplicationMaster container.
    */
   private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = {
@@ -437,16 +396,7 @@ private[spark] class Client(
     env("SPARK_YARN_MODE") = "true"
     env("SPARK_YARN_STAGING_DIR") = stagingDir
     env("SPARK_USER") = 
UserGroupInformation.getCurrentUser().getShortUserName()
-    if (loginFromKeytab) {
-      val remoteFs = FileSystem.get(hadoopConf)
-      val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
-      val credentialsFile = "credentials-" + UUID.randomUUID().toString
-      sparkConf.set(
-        "spark.yarn.credentials.file", new Path(stagingDirPath, 
credentialsFile).toString)
-      logInfo(s"Credentials file set to: $credentialsFile")
-      val renewalInterval = getTokenRenewalInterval(stagingDirPath)
-      sparkConf.set("spark.yarn.token.renewal.interval", 
renewalInterval.toString)
-    }
+
     // Set the environment variables to be passed on to the executors.
     distCacheMgr.setDistFilesEnv(env)
     distCacheMgr.setDistArchivesEnv(env)
@@ -511,6 +461,7 @@ private[spark] class Client(
   private def createContainerLaunchContext(newAppResponse: 
GetNewApplicationResponse)
     : ContainerLaunchContext = {
     logInfo("Setting up container launch context for our AM")
+
     val appId = newAppResponse.getApplicationId
     val appStagingDir = getAppStagingDir(appId)
     val localResources = prepareLocalResources(appStagingDir)
@@ -681,24 +632,6 @@ private[spark] class Client(
     amContainer
   }
 
-  def setupCredentials(): Unit = {
-    if (args.principal != null) {
-      require(args.keytab != null, "Keytab must be specified when principal is 
specified.")
-      logInfo("Attempting to login to the Kerberos" +
-        s" using principal: ${args.principal} and keytab: ${args.keytab}")
-      val f = new File(args.keytab)
-      // Generate a file name that can be used for the keytab file, that does 
not conflict
-      // with any user file.
-      val keytabFileName = f.getName + "-" + UUID.randomUUID().toString
-      UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
-      loginFromKeytab = true
-      sparkConf.set("spark.yarn.keytab", keytabFileName)
-      sparkConf.set("spark.yarn.principal", args.principal)
-      logInfo("Successfully logged into the KDC.")
-    }
-    credentials = UserGroupInformation.getCurrentUser.getCredentials
-  }
-
   /**
    * Report the state of an application until it has exited, either 
successfully or
    * due to some failure, then return a pair of the yarn application state 
(FINISHED, FAILED,
@@ -1055,6 +988,46 @@ object Client extends Logging {
     YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, 
path)
 
   /**
+   * Get the list of namenodes the user may access.
+   */
+  private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
+    sparkConf.get("spark.yarn.access.namenodes", "")
+      .split(",")
+      .map(_.trim())
+      .filter(!_.isEmpty)
+      .map(new Path(_))
+      .toSet
+  }
+
+  private[yarn] def getTokenRenewer(conf: Configuration): String = {
+    val delegTokenRenewer = Master.getMasterPrincipal(conf)
+    logDebug("delegation token renewer is: " + delegTokenRenewer)
+    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
+      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
+      logError(errorMessage)
+      throw new SparkException(errorMessage)
+    }
+    delegTokenRenewer
+  }
+
+  /**
+   * Obtains tokens for the namenodes passed in and adds them to the 
credentials.
+   */
+  private def obtainTokensForNamenodes(
+      paths: Set[Path],
+      conf: Configuration,
+      creds: Credentials): Unit = {
+    if (UserGroupInformation.isSecurityEnabled()) {
+      val delegTokenRenewer = getTokenRenewer(conf)
+      paths.foreach { dst =>
+        val dstFs = dst.getFileSystem(conf)
+        logDebug("getting token for namenode: " + dst)
+        dstFs.addDelegationTokens(delegTokenRenewer, creds)
+      }
+    }
+  }
+
+  /**
    * Obtains token for the Hive metastore and adds them to the credentials.
    */
   private def obtainTokenForHiveMetastore(conf: Configuration, credentials: 
Credentials) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index 5653c9f..1423533 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -42,8 +42,6 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
   var amCores: Int = 1
   var appName: String = "Spark"
   var priority = 0
-  var principal: String = null
-  var keytab: String = null
   def isClusterMode: Boolean = userClass != null
 
   private var driverMemory: Int = 512 // MB
@@ -233,14 +231,6 @@ private[spark] class ClientArguments(args: Array[String], 
sparkConf: SparkConf)
           archives = value
           args = tail
 
-        case ("--principal") :: value :: tail =>
-          principal = value
-          args = tail
-
-        case ("--keytab") :: value :: tail =>
-          keytab = value
-          args = tail
-
         case Nil =>
 
         case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index a5c454e..5881dc5 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -17,26 +17,25 @@
 
 package org.apache.spark.deploy.yarn
 
-import java.io._
+import java.io.File
 import java.util.regex.Matcher
 import java.util.regex.Pattern
 
 import scala.collection.mutable.HashMap
 import scala.util.Try
 
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs._
 import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.{Master, JobConf}
+import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.security.Credentials
 import org.apache.hadoop.security.UserGroupInformation
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
+import org.apache.hadoop.conf.Configuration
 
+import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.{SecurityManager, SparkConf, SparkException}
 import org.apache.spark.util.Utils
 
 /**
@@ -83,48 +82,6 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     if (credentials != null) credentials.getSecretKey(new Text(key)) else null
   }
 
-  /**
-   * Get the list of namenodes the user may access.
-   */
-  def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = {
-    sparkConf.get("spark.yarn.access.namenodes", "")
-      .split(",")
-      .map(_.trim())
-      .filter(!_.isEmpty)
-      .map(new Path(_))
-      .toSet
-  }
-
-  def getTokenRenewer(conf: Configuration): String = {
-    val delegTokenRenewer = Master.getMasterPrincipal(conf)
-    logDebug("delegation token renewer is: " + delegTokenRenewer)
-    if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
-      val errorMessage = "Can't get Master Kerberos principal for use as 
renewer"
-      logError(errorMessage)
-      throw new SparkException(errorMessage)
-    }
-    delegTokenRenewer
-  }
-
-  /**
-   * Obtains tokens for the namenodes passed in and adds them to the 
credentials.
-   */
-  def obtainTokensForNamenodes(
-    paths: Set[Path],
-    conf: Configuration,
-    creds: Credentials,
-    renewer: Option[String] = None
-  ): Unit = {
-    if (UserGroupInformation.isSecurityEnabled()) {
-      val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
-      paths.foreach { dst =>
-        val dstFs = dst.getFileSystem(conf)
-        logInfo("getting token for namenode: " + dst)
-        dstFs.addDelegationTokens(delegTokenRenewer, creds)
-      }
-    }
-  }
-
 }
 
 object YarnSparkHadoopUtil {
@@ -143,14 +100,6 @@ object YarnSparkHadoopUtil {
   // request types (like map/reduce in hadoop for example)
   val RM_REQUEST_PRIORITY = Priority.newInstance(1)
 
-  def get: YarnSparkHadoopUtil = {
-    val yarnMode = java.lang.Boolean.valueOf(
-      System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
-    if (!yarnMode) {
-      throw new SparkException("YarnSparkHadoopUtil is not available in 
non-YARN mode!")
-    }
-    SparkHadoopUtil.get.asInstanceOf[YarnSparkHadoopUtil]
-  }
   /**
    * Add a path variable to the given environment map.
    * If the map already contains this key, append the value to the existing 
value instead.
@@ -263,4 +212,3 @@ object YarnSparkHadoopUtil {
     classPathSeparatorField.get(null).asInstanceOf[String]
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 508819e..a51c200 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,6 +151,57 @@ class ClientSuite extends FunSuite with Matchers with 
BeforeAndAfterAll {
     }
   }
 
+  test("check access nns empty") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "")
+    val nns = Client.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns unset") {
+    val sparkConf = new SparkConf()
+    val nns = Client.getNameNodesToAccess(sparkConf)
+    nns should be(Set())
+  }
+
+  test("check access nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
+    val nns = Client.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access nns space") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
+    val nns = Client.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032")))
+  }
+
+  test("check access two nns") {
+    val sparkConf = new SparkConf()
+    sparkConf.set("spark.yarn.access.namenodes", 
"hdfs://nn1:8032,hdfs://nn2:8032")
+    val nns = Client.getNameNodesToAccess(sparkConf)
+    nns should be(Set(new Path("hdfs://nn1:8032"), new 
Path("hdfs://nn2:8032")))
+  }
+
+  test("check token renewer") {
+    val hadoopConf = new Configuration()
+    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
+    hadoopConf.set("yarn.resourcemanager.principal", 
"yarn/myrm:8...@sparktest.com")
+    val renewer = Client.getTokenRenewer(hadoopConf)
+    renewer should be ("yarn/myrm:8...@sparktest.com")
+  }
+
+  test("check token renewer default") {
+    val hadoopConf = new Configuration()
+    val caught =
+      intercept[SparkException] {
+        Client.getTokenRenewer(hadoopConf)
+      }
+    assert(caught.getMessage === "Can't get Master Kerberos principal for use 
as renewer")
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index d3c606e..3877da4 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -86,7 +86,6 @@ class YarnClusterSuite extends FunSuite with 
BeforeAndAfterAll with Matchers wit
     tempDir = Utils.createTempDir()
     logConfDir = new File(tempDir, "log4j")
     logConfDir.mkdir()
-    System.setProperty("SPARK_YARN_MODE", "true")
 
     val logConfFile = new File(logConfDir, "log4j.properties")
     Files.write(LOG4J_CONF, logConfFile, UTF_8)
@@ -129,7 +128,6 @@ class YarnClusterSuite extends FunSuite with 
BeforeAndAfterAll with Matchers wit
 
   override def afterAll() {
     yarnCluster.stop()
-    System.clearProperty("SPARK_YARN_MODE")
     super.afterAll()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e0628f2f/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
index e10b985..9395316 100644
--- 
a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
+++ 
b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala
@@ -20,8 +20,6 @@ package org.apache.spark.deploy.yarn
 import java.io.{File, IOException}
 
 import com.google.common.io.{ByteStreams, Files}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
 import org.apache.hadoop.yarn.api.ApplicationConstants
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -29,7 +27,7 @@ import org.scalatest.{FunSuite, Matchers}
 
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType
 
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.util.Utils
 
 
@@ -175,62 +173,4 @@ class YarnSparkHadoopUtilSuite extends FunSuite with 
Matchers with Logging {
       YarnSparkHadoopUtil.getClassPathSeparator() should be (":")
     }
   }
-
-  test("check access nns empty") {
-    val sparkConf = new SparkConf()
-    val util = new YarnSparkHadoopUtil
-    sparkConf.set("spark.yarn.access.namenodes", "")
-    val nns = util.getNameNodesToAccess(sparkConf)
-    nns should be(Set())
-  }
-
-  test("check access nns unset") {
-    val sparkConf = new SparkConf()
-    val util = new YarnSparkHadoopUtil
-    val nns = util.getNameNodesToAccess(sparkConf)
-    nns should be(Set())
-  }
-
-  test("check access nns") {
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032")
-    val util = new YarnSparkHadoopUtil
-    val nns = util.getNameNodesToAccess(sparkConf)
-    nns should be(Set(new Path("hdfs://nn1:8032")))
-  }
-
-  test("check access nns space") {
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.yarn.access.namenodes", "hdfs://nn1:8032, ")
-    val util = new YarnSparkHadoopUtil
-    val nns = util.getNameNodesToAccess(sparkConf)
-    nns should be(Set(new Path("hdfs://nn1:8032")))
-  }
-
-  test("check access two nns") {
-    val sparkConf = new SparkConf()
-    sparkConf.set("spark.yarn.access.namenodes", 
"hdfs://nn1:8032,hdfs://nn2:8032")
-    val util = new YarnSparkHadoopUtil
-    val nns = util.getNameNodesToAccess(sparkConf)
-    nns should be(Set(new Path("hdfs://nn1:8032"), new 
Path("hdfs://nn2:8032")))
-  }
-
-  test("check token renewer") {
-    val hadoopConf = new Configuration()
-    hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
-    hadoopConf.set("yarn.resourcemanager.principal", 
"yarn/myrm:8...@sparktest.com")
-    val util = new YarnSparkHadoopUtil
-    val renewer = util.getTokenRenewer(hadoopConf)
-    renewer should be ("yarn/myrm:8...@sparktest.com")
-  }
-
-  test("check token renewer default") {
-    val hadoopConf = new Configuration()
-    val util = new YarnSparkHadoopUtil
-    val caught =
-      intercept[SparkException] {
-        util.getTokenRenewer(hadoopConf)
-      }
-    assert(caught.getMessage === "Can't get Master Kerberos principal for use 
as renewer")
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to