This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 75b40a5 [SPARK-27575][CORE][YARN] Yarn file-related confs should merge new value with existing value 75b40a5 is described below commit 75b40a53d3fb49ccadf032dd8e75c554c67676d2 Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Mon Apr 29 10:14:59 2019 -0700 [SPARK-27575][CORE][YARN] Yarn file-related confs should merge new value with existing value ## What changes were proposed in this pull request? This patch fixes a bug which YARN file-related configurations are being overwritten when there're some values to assign - e.g. if `--file` is specified as an argument, `spark.yarn.dist.files` is overwritten with the value of argument. After this patch the existing value and new value will be merged before assigning to the value of configuration. ## How was this patch tested? Added UT, and manually tested with below command: > ./bin/spark-submit --verbose --files /etc/spark2/conf/spark-defaults.conf.template --master yarn-cluster --class org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.4.0.jar 10 where the spark conf file has `spark.yarn.dist.files=file:/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties` ``` Spark config: ... (spark.yarn.dist.files,file:/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties,file:///etc/spark2/conf/spark-defaults.conf.template) ... ``` Closes #24465 from HeartSaVioR/SPARK-27575. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../org/apache/spark/deploy/SparkSubmit.scala | 23 +++++-- .../org/apache/spark/deploy/SparkSubmitSuite.scala | 80 +++++++++++++++++++++- 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9efaaa7..49d9395 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -544,10 +544,14 @@ private[spark] class SparkSubmit extends Logging { // Yarn only OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"), - OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"), - OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"), - OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"), - OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"), + OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles", + mergeFn = Some(mergeFileLists(_, _))), + OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars", + mergeFn = Some(mergeFileLists(_, _))), + OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files", + mergeFn = Some(mergeFileLists(_, _))), + OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives", + mergeFn = Some(mergeFileLists(_, _))), // Other options OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES, @@ -608,7 +612,13 @@ private[spark] class SparkSubmit extends Logging { (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } - if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) } + if (opt.confKey != null) { + if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) { + sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value)) + } else { + sparkConf.set(opt.confKey, opt.value) + } + } } } @@ -1381,7 +1391,8 @@ private case class OptionAssigner( clusterManager: Int, deployMode: Int, clOption: String = null, - confKey: String = null) + confKey: String = null, + mergeFn: Option[(String, String) => String] = None) private[spark] trait SparkSubmitOperation { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 2a17245..ef6213e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -18,11 +18,10 @@ package org.apache.spark.deploy import java.io._ -import java.net.URI +import java.net.{URI, URL} import java.nio.charset.StandardCharsets import java.nio.file.{Files, Paths} -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.io.Source @@ -897,6 +896,83 @@ class SparkSubmitSuite } } + test("SPARK-27575: yarn confs should merge new value with existing value") { + val tmpJarDir = Utils.createTempDir() + val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir) + val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpJarDir) + + val tmpJarDirYarnOpt = Utils.createTempDir() + val jar1YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> "2"), tmpJarDirYarnOpt) + val jar2YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> "USER2"), + tmpJarDirYarnOpt) + + val tmpFileDir = Utils.createTempDir() + val file1 = File.createTempFile("tmpFile1", "", tmpFileDir) + val file2 = File.createTempFile("tmpFile2", "", tmpFileDir) + + val tmpFileDirYarnOpt = Utils.createTempDir() + val file1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", tmpFileDirYarnOpt) + val file2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", tmpFileDirYarnOpt) + + val tmpPyFileDir = Utils.createTempDir() + val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir) + val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir) + + val tmpPyFileDirYarnOpt = Utils.createTempDir() + val pyFile1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", tmpPyFileDirYarnOpt) + val pyFile2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", tmpPyFileDirYarnOpt) + + val tmpArchiveDir = Utils.createTempDir() + val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir) + val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir) + + val tmpArchiveDirYarnOpt = Utils.createTempDir() + val archive1YarnOpt = File.createTempFile("archive1YarnOpt", ".zip", tmpArchiveDirYarnOpt) + val archive2YarnOpt = File.createTempFile("archive2YarnOpt", ".zip", tmpArchiveDirYarnOpt) + + val tempPyFile = File.createTempFile("tmpApp", ".py") + tempPyFile.deleteOnExit() + + val args = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", s"${tmpJarDir.getAbsolutePath}/*.jar", + "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*", + "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*", + "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip", + "--conf", "spark.yarn.dist.files=" + + s"${Seq(file1YarnOpt, file2YarnOpt).map(_.toURI.toString).mkString(",")}", + "--conf", "spark.yarn.dist.pyFiles=" + + s"${Seq(pyFile1YarnOpt, pyFile2YarnOpt).map(_.toURI.toString).mkString(",")}", + "--conf", "spark.yarn.dist.jars=" + + s"${Seq(jar1YarnOpt, jar2YarnOpt).map(_.toURI.toString).mkString(",")}", + "--conf", "spark.yarn.dist.archives=" + + s"${Seq(archive1YarnOpt, archive2YarnOpt).map(_.toURI.toString).mkString(",")}", + tempPyFile.toURI().toString()) + + def assertEqualsWithURLs(expected: Set[URL], confValue: String): Unit = { + val confValPaths = confValue.split(",").map(new Path(_)).toSet + assert(expected.map(u => new Path(u.toURI)) === confValPaths) + } + + def assertEqualsWithFiles(expected: Set[File], confValue: String): Unit = { + assertEqualsWithURLs(expected.map(_.toURI.toURL), confValue) + } + + val appArgs = new SparkSubmitArguments(args) + val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs) + assertEqualsWithURLs( + Set(jar1, jar2, jar1YarnOpt, jar2YarnOpt), conf.get("spark.yarn.dist.jars")) + assertEqualsWithFiles( + Set(file1, file2, file1YarnOpt, file2YarnOpt), conf.get("spark.yarn.dist.files")) + assertEqualsWithFiles( + Set(pyFile1, pyFile2, pyFile1YarnOpt, pyFile2YarnOpt), conf.get("spark.yarn.dist.pyFiles")) + assertEqualsWithFiles(Set(archive1, archive2, archive1YarnOpt, archive2YarnOpt), + conf.get("spark.yarn.dist.archives")) + } + // scalastyle:on println private def checkDownloadedFile(sourcePath: String, outputPath: String): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org