Repository: spark Updated Branches: refs/heads/master 1f24ceee6 -> 1813c4a8d
[SPARK-21714][CORE][YARN] Avoiding re-uploading remote resources in yarn client mode ## What changes were proposed in this pull request? With SPARK-10643, Spark supports download resources from remote in client deploy mode. But the implementation overrides variables which representing added resources (like `args.jars`, `args.pyFiles`) to local path, And yarn client leverage this local path to re-upload resources to distributed cache. This is unnecessary to break the semantics of putting resources in a shared FS. So here proposed to fix it. ## How was this patch tested? This is manually verified with jars, pyFiles in local and remote storage, both in client and cluster mode. Author: jerryshao <ss...@hortonworks.com> Closes #18962 from jerryshao/SPARK-21714. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1813c4a8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1813c4a8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1813c4a8 Branch: refs/heads/master Commit: 1813c4a8dd4388fe76a4ec772c9be151be0f60a1 Parents: 1f24cee Author: jerryshao <ss...@hortonworks.com> Authored: Fri Aug 25 09:57:53 2017 -0700 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Fri Aug 25 09:57:53 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/deploy/SparkSubmit.scala | 64 +++++++++++------- .../apache/spark/internal/config/package.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 25 ++++--- .../apache/spark/deploy/SparkSubmitSuite.scala | 70 ++++++++++++++++---- .../main/scala/org/apache/spark/repl/Main.scala | 2 +- 5 files changed, 114 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e569251..548149a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -212,14 +212,20 @@ object SparkSubmit extends CommandLineUtils { /** * Prepare the environment for submitting an application. - * This returns a 4-tuple: - * (1) the arguments for the child process, - * (2) a list of classpath entries for the child, - * (3) a map of system properties, and - * (4) the main class for the child + * + * @param args the parsed SparkSubmitArguments used for environment preparation. + * @param conf the Hadoop Configuration, this argument will only be set in unit test. + * @return a 4-tuple: + * (1) the arguments for the child process, + * (2) a list of classpath entries for the child, + * (3) a map of system properties, and + * (4) the main class for the child + * * Exposed for testing. */ - private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments) + private[deploy] def prepareSubmitEnvironment( + args: SparkSubmitArguments, + conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], Map[String, String], String) = { // Return values val childArgs = new ArrayBuffer[String]() @@ -322,7 +328,7 @@ object SparkSubmit extends CommandLineUtils { } } - val hadoopConf = new HadoopConfiguration() + val hadoopConf = conf.getOrElse(new HadoopConfiguration()) val targetDir = DependencyUtils.createTempDir() // Resolve glob path for different resources. @@ -332,19 +338,21 @@ object SparkSubmit extends CommandLineUtils { args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull // In client mode, download remote files. + var localPrimaryResource: String = null + var localJars: String = null + var localPyFiles: String = null if (deployMode == CLIENT) { - args.primaryResource = Option(args.primaryResource).map { + localPrimaryResource = Option(args.primaryResource).map { downloadFile(_, targetDir, args.sparkProperties, hadoopConf) }.orNull - args.jars = Option(args.jars).map { + localJars = Option(args.jars).map { downloadFileList(_, targetDir, args.sparkProperties, hadoopConf) }.orNull - args.pyFiles = Option(args.pyFiles).map { + localPyFiles = Option(args.pyFiles).map { downloadFileList(_, targetDir, args.sparkProperties, hadoopConf) }.orNull } - // If we're running a python app, set the main class to our specific python runner if (args.isPython && deployMode == CLIENT) { if (args.primaryResource == PYSPARK_SHELL) { @@ -353,7 +361,7 @@ object SparkSubmit extends CommandLineUtils { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner <main python file> <extra python files> [app arguments] args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs + args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs if (clusterManager != YARN) { // The YARN backend distributes the primary file differently, so don't merge it. args.files = mergeFileLists(args.files, args.primaryResource) @@ -363,8 +371,8 @@ object SparkSubmit extends CommandLineUtils { // The YARN backend handles python files differently, so don't merge the lists. args.files = mergeFileLists(args.files, args.pyFiles) } - if (args.pyFiles != null) { - sysProps("spark.submit.pyFiles") = args.pyFiles + if (localPyFiles != null) { + sysProps("spark.submit.pyFiles") = localPyFiles } } @@ -418,7 +426,7 @@ object SparkSubmit extends CommandLineUtils { // If an R file is provided, add it to the child arguments and list of files to deploy. // Usage: RRunner <main R file> [app arguments] args.mainClass = "org.apache.spark.deploy.RRunner" - args.childArgs = ArrayBuffer(args.primaryResource) ++ args.childArgs + args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs args.files = mergeFileLists(args.files, args.primaryResource) } } @@ -463,6 +471,7 @@ object SparkSubmit extends CommandLineUtils { OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.queue"), OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.instances"), + OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.pyFiles"), OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.jars"), OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.files"), OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, sysProp = "spark.yarn.dist.archives"), @@ -486,15 +495,28 @@ object SparkSubmit extends CommandLineUtils { sysProp = "spark.driver.cores"), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, sysProp = "spark.driver.supervise"), - OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy") + OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, sysProp = "spark.jars.ivy"), + + // An internal option used only for spark-shell to add user jars to repl's classloader, + // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to + // remote jars, so adding a new option to only specify local jars for spark-shell internally. + OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, sysProp = "spark.repl.local.jars") ) // In client mode, launch the application main class directly // In addition, add the main application jar and any added jars (if any) to the classpath - // Also add the main application jar and any added jars to classpath in case YARN client - // requires these jars. - if (deployMode == CLIENT || isYarnCluster) { + if (deployMode == CLIENT) { childMainClass = args.mainClass + if (localPrimaryResource != null && isUserJar(localPrimaryResource)) { + childClasspath += localPrimaryResource + } + if (localJars != null) { childClasspath ++= localJars.split(",") } + } + // Add the main application jar and any added jars to classpath in case YARN client + // requires these jars. + // This assumes both primaryResource and user jars are local jars, otherwise it will not be + // added to the classpath of YARN client. + if (isYarnCluster) { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } @@ -551,10 +573,6 @@ object SparkSubmit extends CommandLineUtils { if (args.isPython) { sysProps.put("spark.yarn.isPython", "true") } - - if (args.pyFiles != null) { - sysProps("spark.submit.pyFiles") = args.pyFiles - } } // assure a keytab is available from any place in a JVM http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 0457a66..0d3769a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -87,7 +87,7 @@ package object config { .intConf .createOptional - private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles") + private[spark] val PY_FILES = ConfigBuilder("spark.yarn.dist.pyFiles") .internal() .stringConf .toSequence http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 900a619..3dce76c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2601,18 +2601,23 @@ private[spark] object Utils extends Logging { } /** - * In YARN mode this method returns a union of the jar files pointed by "spark.jars" and the - * "spark.yarn.dist.jars" properties, while in other modes it returns the jar files pointed by - * only the "spark.jars" property. + * Return the jar files pointed by the "spark.jars" property. Spark internally will distribute + * these jars through file server. In the YARN mode, it will return an empty list, since YARN + * has its own mechanism to distribute jars. */ - def getUserJars(conf: SparkConf, isShell: Boolean = false): Seq[String] = { + def getUserJars(conf: SparkConf): Seq[String] = { val sparkJars = conf.getOption("spark.jars") - if (conf.get("spark.master") == "yarn" && isShell) { - val yarnJars = conf.getOption("spark.yarn.dist.jars") - unionFileLists(sparkJars, yarnJars).toSeq - } else { - sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten - } + sparkJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten + } + + /** + * Return the local jar files which will be added to REPL's classpath. These jar files are + * specified by --jars (spark.jars) or --packages, remote jars will be downloaded to local by + * SparkSubmit at first. + */ + def getLocalUserJarsForShell(conf: SparkConf): Seq[String] = { + val localJars = conf.getOption("spark.repl.local.jars") + localJars.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten } private[spark] val REDACTION_REPLACEMENT_TEXT = "*********(redacted)" http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 08ba41f..95137c8 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -29,7 +29,7 @@ import scala.io.Source import com.google.common.io.ByteStreams import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, Path, RawLocalFileSystem} import org.scalatest.{BeforeAndAfterEach, Matchers} import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ @@ -762,7 +762,7 @@ class SparkSubmitSuite (Set(jar1.toURI.toString, jar2.toURI.toString)) sysProps("spark.yarn.dist.files").split(",").toSet should be (Set(file1.toURI.toString, file2.toURI.toString)) - sysProps("spark.submit.pyFiles").split(",").toSet should be + sysProps("spark.yarn.dist.pyFiles").split(",").toSet should be (Set(pyFile1.getAbsolutePath, pyFile2.getAbsolutePath)) sysProps("spark.yarn.dist.archives").split(",").toSet should be (Set(archive1.toURI.toString, archive2.toURI.toString)) @@ -802,10 +802,7 @@ class SparkSubmitSuite test("downloadFile - file doesn't exist") { val hadoopConf = new Configuration() val tmpDir = Utils.createTempDir() - // Set s3a implementation to local file system for testing. - hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") - // Disable file system impl cache to make sure the test file system is picked up. - hadoopConf.set("fs.s3a.impl.disable.cache", "true") + updateConfWithFakeS3Fs(hadoopConf) intercept[FileNotFoundException] { SparkSubmit.downloadFile("s3a:/no/such/file", tmpDir, mutable.Map.empty, hadoopConf) } @@ -826,10 +823,7 @@ class SparkSubmitSuite FileUtils.write(jarFile, content) val hadoopConf = new Configuration() val tmpDir = Files.createTempDirectory("tmp").toFile - // Set s3a implementation to local file system for testing. - hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") - // Disable file system impl cache to make sure the test file system is picked up. - hadoopConf.set("fs.s3a.impl.disable.cache", "true") + updateConfWithFakeS3Fs(hadoopConf) val sourcePath = s"s3a://${jarFile.getAbsolutePath}" val outputPath = SparkSubmit.downloadFile(sourcePath, tmpDir, mutable.Map.empty, hadoopConf) @@ -844,10 +838,7 @@ class SparkSubmitSuite FileUtils.write(jarFile, content) val hadoopConf = new Configuration() val tmpDir = Files.createTempDirectory("tmp").toFile - // Set s3a implementation to local file system for testing. - hadoopConf.set("fs.s3a.impl", "org.apache.spark.deploy.TestFileSystem") - // Disable file system impl cache to make sure the test file system is picked up. - hadoopConf.set("fs.s3a.impl.disable.cache", "true") + updateConfWithFakeS3Fs(hadoopConf) val sourcePaths = Seq("/local/file", s"s3a://${jarFile.getAbsolutePath}") val outputPaths = SparkSubmit.downloadFileList( sourcePaths.mkString(","), tmpDir, mutable.Map.empty, hadoopConf).split(",") @@ -859,6 +850,43 @@ class SparkSubmitSuite } } + test("Avoid re-upload remote resources in yarn client mode") { + val hadoopConf = new Configuration() + updateConfWithFakeS3Fs(hadoopConf) + + val tmpDir = Utils.createTempDir() + val file = File.createTempFile("tmpFile", "", tmpDir) + val pyFile = File.createTempFile("tmpPy", ".egg", tmpDir) + val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) + val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) + val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}" + + val args = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", tmpJarPath, + "--files", s"s3a://${file.getAbsolutePath}", + "--py-files", s"s3a://${pyFile.getAbsolutePath}", + s"s3a://$mainResource" + ) + + val appArgs = new SparkSubmitArguments(args) + val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 + + // All the resources should still be remote paths, so that YARN client will not upload again. + sysProps("spark.yarn.dist.jars") should be (tmpJarPath) + sysProps("spark.yarn.dist.files") should be (s"s3a://${file.getAbsolutePath}") + sysProps("spark.yarn.dist.pyFiles") should be (s"s3a://${pyFile.getAbsolutePath}") + + // Local repl jars should be a local path. + sysProps("spark.repl.local.jars") should (startWith("file:")) + + // local py files should not be a URI format. + sysProps("spark.submit.pyFiles") should (startWith("/")) + } + // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. private def runSparkSubmit(args: Seq[String]): Unit = { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) @@ -898,6 +926,11 @@ class SparkSubmitSuite Utils.deleteRecursively(tmpDir) } } + + private def updateConfWithFakeS3Fs(conf: Configuration): Unit = { + conf.set("fs.s3a.impl", classOf[TestFileSystem].getCanonicalName) + conf.set("fs.s3a.impl.disable.cache", "true") + } } object JarCreationTest extends Logging { @@ -967,4 +1000,13 @@ class TestFileSystem extends org.apache.hadoop.fs.LocalFileSystem { // Ignore the scheme for testing. super.copyToLocalFile(new Path(src.toUri.getPath), dst) } + + override def globStatus(pathPattern: Path): Array[FileStatus] = { + val newPath = new Path(pathPattern.toUri.getPath) + super.globStatus(newPath).map { status => + val path = s"s3a://${status.getPath.toUri.getPath}" + status.setPath(new Path(path)) + status + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/1813c4a8/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala ---------------------------------------------------------------------- diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala index 9702a1e..0b16e1b 100644 --- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala @@ -57,7 +57,7 @@ object Main extends Logging { // Visible for testing private[repl] def doMain(args: Array[String], _interp: SparkILoop): Unit = { interp = _interp - val jars = Utils.getUserJars(conf, isShell = true) + val jars = Utils.getLocalUserJarsForShell(conf) // Remove file:///, file:// or file:/ scheme if exists for each jar .map { x => if (x.startsWith("file:")) new File(new URI(x)).getPath else x } .mkString(File.pathSeparator) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org