Repository: spark Updated Branches: refs/heads/branch-1.4 de49916ab -> e91d87e66
[SPARK-8657] [YARN] Fail to upload resource to viewfs Fail to upload resource to viewfs in spark-1.4 JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657 Author: Tao Li <li...@sogou-inc.com> Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits: 65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs (cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c) Signed-off-by: Sean Owen <so...@cloudera.com> # Conflicts: # yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e91d87e6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e91d87e6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e91d87e6 Branch: refs/heads/branch-1.4 Commit: e91d87e6631989527c88d3524152d77e92267aea Parents: de49916 Author: Tao Li <li...@sogou-inc.com> Authored: Wed Jul 8 19:02:24 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Jul 8 19:06:39 2015 +0100 ---------------------------------------------------------------------- .../org/apache/spark/deploy/yarn/Client.scala | 51 ++++++++++++++++++++ 1 file changed, 51 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e91d87e6/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9296e79..38e5926 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -304,6 +304,57 @@ private[spark] class Client( } /** + * Distribute a file to the cluster. + * + * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied + * to HDFS (if not already there) and added to the application's distributed cache. + * + * @param path URI of the file to distribute. + * @param resType Type of resource being distributed. + * @param destName Name of the file in the distributed cache. + * @param targetDir Subdirectory where to place the file. + * @param appMasterOnly Whether to distribute only to the AM. + * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the + * localized path for non-local paths, or the input `path` for local paths. + * The localized path will be null if the URI has already been added to the cache. + */ + def distribute( + path: String, + resType: LocalResourceType = LocalResourceType.FILE, + destName: Option[String] = None, + targetDir: Option[String] = None, + appMasterOnly: Boolean = false): (Boolean, String) = { + val localURI = new URI(path.trim()) + if (localURI.getScheme != LOCAL_SCHEME) { + if (addDistributedUri(localURI)) { + val localPath = getQualifiedLocalPath(localURI, hadoopConf) + val linkname = targetDir.map(_ + "/").getOrElse("") + + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) + val destPath = copyFileToRemote(dst, localPath, replication) + distCacheMgr.addResource( + fs, hadoopConf, destPath, localResources, resType, linkname, statCache, + appMasterOnly = appMasterOnly) + (false, linkname) + } else { + (false, null) + } + } else { + (true, path.trim()) + } + } + + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. + if (loginFromKeytab) { + logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + + " via the YARN Secure Distributed Cache.") + val (_, localizedPath) = distribute(args.keytab, + destName = Some(sparkConf.get("spark.yarn.keytab")), + appMasterOnly = true) + require(localizedPath != null, "Keytab file already distributed.") + } + + /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. * Each resource is represented by a 3-tuple of: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org