Repository: spark
Updated Branches:
  refs/heads/branch-1.4 de49916ab -> e91d87e66


[SPARK-8657] [YARN] Fail to upload resource to viewfs

Fail to upload resource to viewfs in spark-1.4
JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657

Author: Tao Li <li...@sogou-inc.com>

Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the 
following commits:

65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs

(cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c)
Signed-off-by: Sean Owen <so...@cloudera.com>

# Conflicts:
#       yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e91d87e6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e91d87e6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e91d87e6

Branch: refs/heads/branch-1.4
Commit: e91d87e6631989527c88d3524152d77e92267aea
Parents: de49916
Author: Tao Li <li...@sogou-inc.com>
Authored: Wed Jul 8 19:02:24 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Jul 8 19:06:39 2015 +0100

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 51 ++++++++++++++++++++
 1 file changed, 51 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e91d87e6/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala 
b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9296e79..38e5926 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -304,6 +304,57 @@ private[spark] class Client(
     }
 
     /**
+     * Distribute a file to the cluster.
+     *
+     * If the file's path is a "local:" URI, it's actually not distributed. 
Other files are copied
+     * to HDFS (if not already there) and added to the application's 
distributed cache.
+     *
+     * @param path URI of the file to distribute.
+     * @param resType Type of resource being distributed.
+     * @param destName Name of the file in the distributed cache.
+     * @param targetDir Subdirectory where to place the file.
+     * @param appMasterOnly Whether to distribute only to the AM.
+     * @return A 2-tuple. First item is whether the file is a "local:" URI. 
Second item is the
+     *         localized path for non-local paths, or the input `path` for 
local paths.
+     *         The localized path will be null if the URI has already been 
added to the cache.
+     */
+    def distribute(
+        path: String,
+        resType: LocalResourceType = LocalResourceType.FILE,
+        destName: Option[String] = None,
+        targetDir: Option[String] = None,
+        appMasterOnly: Boolean = false): (Boolean, String) = {
+      val localURI = new URI(path.trim())
+      if (localURI.getScheme != LOCAL_SCHEME) {
+        if (addDistributedUri(localURI)) {
+          val localPath = getQualifiedLocalPath(localURI, hadoopConf)
+          val linkname = targetDir.map(_ + "/").getOrElse("") +
+            
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
+          val destPath = copyFileToRemote(dst, localPath, replication)
+          distCacheMgr.addResource(
+            fs, hadoopConf, destPath, localResources, resType, linkname, 
statCache,
+            appMasterOnly = appMasterOnly)
+          (false, linkname)
+        } else {
+          (false, null)
+        }
+      } else {
+        (true, path.trim())
+      }
+    }
+
+    // If we passed in a keytab, make sure we copy the keytab to the staging 
directory on
+    // HDFS, and setup the relevant environment vars, so the AM can login 
again.
+    if (loginFromKeytab) {
+      logInfo("To enable the AM to login from keytab, credentials are being 
copied over to the AM" +
+        " via the YARN Secure Distributed Cache.")
+      val (_, localizedPath) = distribute(args.keytab,
+        destName = Some(sparkConf.get("spark.yarn.keytab")),
+        appMasterOnly = true)
+      require(localizedPath != null, "Keytab file already distributed.")
+    }
+
+    /**
      * Copy the given main resource to the distributed cache if the scheme is 
not "local".
      * Otherwise, set the corresponding key in our SparkConf to handle it 
downstream.
      * Each resource is represented by a 3-tuple of:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to