Repository: spark Updated Branches: refs/heads/branch-1.4 e91d87e66 -> e4313db38
[SPARK-8657] [YARN] [HOTFIX] Fail to upload resource to viewfs Fail to upload resource to viewfs in spark-1.4 JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657 Author: Tao Li <li...@sogou-inc.com> Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits: 65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs (cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4313db3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4313db3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4313db3 Branch: refs/heads/branch-1.4 Commit: e4313db38e81f6288f1704c22e17d0c6e81b4d75 Parents: e91d87e Author: Tao Li <li...@sogou-inc.com> Authored: Wed Jul 8 19:02:24 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Jul 8 19:23:29 2015 +0100 ---------------------------------------------------------------------- .../org/apache/spark/deploy/yarn/Client.scala | 57 ++------------------ 1 file changed, 4 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/e4313db3/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 38e5926..cc0aa45 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -304,57 +304,6 @@ private[spark] class Client( } /** - * Distribute a file to the cluster. - * - * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied - * to HDFS (if not already there) and added to the application's distributed cache. - * - * @param path URI of the file to distribute. - * @param resType Type of resource being distributed. - * @param destName Name of the file in the distributed cache. - * @param targetDir Subdirectory where to place the file. - * @param appMasterOnly Whether to distribute only to the AM. - * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the - * localized path for non-local paths, or the input `path` for local paths. - * The localized path will be null if the URI has already been added to the cache. - */ - def distribute( - path: String, - resType: LocalResourceType = LocalResourceType.FILE, - destName: Option[String] = None, - targetDir: Option[String] = None, - appMasterOnly: Boolean = false): (Boolean, String) = { - val localURI = new URI(path.trim()) - if (localURI.getScheme != LOCAL_SCHEME) { - if (addDistributedUri(localURI)) { - val localPath = getQualifiedLocalPath(localURI, hadoopConf) - val linkname = targetDir.map(_ + "/").getOrElse("") + - destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache, - appMasterOnly = appMasterOnly) - (false, linkname) - } else { - (false, null) - } - } else { - (true, path.trim()) - } - } - - // If we passed in a keytab, make sure we copy the keytab to the staging directory on - // HDFS, and setup the relevant environment vars, so the AM can login again. - if (loginFromKeytab) { - logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + - " via the YARN Secure Distributed Cache.") - val (_, localizedPath) = distribute(args.keytab, - destName = Some(sparkConf.get("spark.yarn.keytab")), - appMasterOnly = true) - require(localizedPath != null, "Keytab file already distributed.") - } - - /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. * Each resource is represented by a 3-tuple of: @@ -389,7 +338,8 @@ private[spark] class Client( createConfArchive().foreach { file => require(addDistributedUri(file.toURI())) val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication) - distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) + distCacheMgr.addResource(destFs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true) } @@ -414,8 +364,9 @@ private[spark] class Client( val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyFileToRemote(dst, localPath, replication) + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) + destFs, hadoopConf, destPath, localResources, resType, linkname, statCache) if (addToClasspath) { cachedSecondaryJarLinks += linkname } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org