This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 75b40a5  [SPARK-27575][CORE][YARN] Yarn file-related confs should 
merge new value with existing value
75b40a5 is described below

commit 75b40a53d3fb49ccadf032dd8e75c554c67676d2
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Mon Apr 29 10:14:59 2019 -0700

    [SPARK-27575][CORE][YARN] Yarn file-related confs should merge new value 
with existing value
    
    ## What changes were proposed in this pull request?
    
    This patch fixes a bug which YARN file-related configurations are being 
overwritten when there're some values to assign - e.g. if `--file` is specified 
as an argument, `spark.yarn.dist.files` is overwritten with the value of 
argument. After this patch the existing value and new value will be merged 
before assigning to the value of configuration.
    
    ## How was this patch tested?
    
    Added UT, and manually tested with below command:
    
    > ./bin/spark-submit --verbose --files 
/etc/spark2/conf/spark-defaults.conf.template --master yarn-cluster --class 
org.apache.spark.examples.SparkPi examples/jars/spark-examples_2.11-2.4.0.jar 10
    
    where the spark conf file has
    
    
`spark.yarn.dist.files=file:/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties`
    
    ```
    Spark config:
    ...
    
(spark.yarn.dist.files,file:/etc/spark2/conf/atlas-application.properties.yarn#atlas-application.properties,file:///etc/spark2/conf/spark-defaults.conf.template)
    ...
    ```
    
    Closes #24465 from HeartSaVioR/SPARK-27575.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      | 23 +++++--
 .../org/apache/spark/deploy/SparkSubmitSuite.scala | 80 +++++++++++++++++++++-
 2 files changed, 95 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 9efaaa7..49d9395 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -544,10 +544,14 @@ private[spark] class SparkSubmit extends Logging {
 
       // Yarn only
       OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.queue"),
-      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.pyFiles"),
-      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.jars"),
-      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.files"),
-      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.archives"),
+      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.pyFiles",
+        mergeFn = Some(mergeFileLists(_, _))),
+      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.jars",
+        mergeFn = Some(mergeFileLists(_, _))),
+      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.files",
+        mergeFn = Some(mergeFileLists(_, _))),
+      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = 
"spark.yarn.dist.archives",
+        mergeFn = Some(mergeFileLists(_, _))),
 
       // Other options
       OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
@@ -608,7 +612,13 @@ private[spark] class SparkSubmit extends Logging {
           (deployMode & opt.deployMode) != 0 &&
           (clusterManager & opt.clusterManager) != 0) {
         if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
-        if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
+        if (opt.confKey != null) {
+          if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
+            sparkConf.set(opt.confKey, 
opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
+          } else {
+            sparkConf.set(opt.confKey, opt.value)
+          }
+        }
       }
     }
 
@@ -1381,7 +1391,8 @@ private case class OptionAssigner(
     clusterManager: Int,
     deployMode: Int,
     clOption: String = null,
-    confKey: String = null)
+    confKey: String = null,
+    mergeFn: Option[(String, String) => String] = None)
 
 private[spark] trait SparkSubmitOperation {
 
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 2a17245..ef6213e 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -18,11 +18,10 @@
 package org.apache.spark.deploy
 
 import java.io._
-import java.net.URI
+import java.net.{URI, URL}
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Paths}
 
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 
@@ -897,6 +896,83 @@ class SparkSubmitSuite
     }
   }
 
+  test("SPARK-27575: yarn confs should merge new value with existing value") {
+    val tmpJarDir = Utils.createTempDir()
+    val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), 
tmpJarDir)
+    val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), 
tmpJarDir)
+
+    val tmpJarDirYarnOpt = Utils.createTempDir()
+    val jar1YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> 
"2"), tmpJarDirYarnOpt)
+    val jar2YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> 
"USER2"),
+      tmpJarDirYarnOpt)
+
+    val tmpFileDir = Utils.createTempDir()
+    val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
+    val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)
+
+    val tmpFileDirYarnOpt = Utils.createTempDir()
+    val file1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", 
tmpFileDirYarnOpt)
+    val file2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", 
tmpFileDirYarnOpt)
+
+    val tmpPyFileDir = Utils.createTempDir()
+    val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
+    val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)
+
+    val tmpPyFileDirYarnOpt = Utils.createTempDir()
+    val pyFile1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", 
tmpPyFileDirYarnOpt)
+    val pyFile2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", 
tmpPyFileDirYarnOpt)
+
+    val tmpArchiveDir = Utils.createTempDir()
+    val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
+    val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)
+
+    val tmpArchiveDirYarnOpt = Utils.createTempDir()
+    val archive1YarnOpt = File.createTempFile("archive1YarnOpt", ".zip", 
tmpArchiveDirYarnOpt)
+    val archive2YarnOpt = File.createTempFile("archive2YarnOpt", ".zip", 
tmpArchiveDirYarnOpt)
+
+    val tempPyFile = File.createTempFile("tmpApp", ".py")
+    tempPyFile.deleteOnExit()
+
+    val args = Seq(
+      "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
+      "--name", "testApp",
+      "--master", "yarn",
+      "--deploy-mode", "client",
+      "--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
+      "--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
+      "--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
+      "--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
+      "--conf", "spark.yarn.dist.files=" +
+        s"${Seq(file1YarnOpt, 
file2YarnOpt).map(_.toURI.toString).mkString(",")}",
+      "--conf", "spark.yarn.dist.pyFiles=" +
+        s"${Seq(pyFile1YarnOpt, 
pyFile2YarnOpt).map(_.toURI.toString).mkString(",")}",
+      "--conf", "spark.yarn.dist.jars=" +
+        s"${Seq(jar1YarnOpt, 
jar2YarnOpt).map(_.toURI.toString).mkString(",")}",
+      "--conf", "spark.yarn.dist.archives=" +
+        s"${Seq(archive1YarnOpt, 
archive2YarnOpt).map(_.toURI.toString).mkString(",")}",
+      tempPyFile.toURI().toString())
+
+    def assertEqualsWithURLs(expected: Set[URL], confValue: String): Unit = {
+      val confValPaths = confValue.split(",").map(new Path(_)).toSet
+      assert(expected.map(u => new Path(u.toURI)) === confValPaths)
+    }
+
+    def assertEqualsWithFiles(expected: Set[File], confValue: String): Unit = {
+      assertEqualsWithURLs(expected.map(_.toURI.toURL), confValue)
+    }
+
+    val appArgs = new SparkSubmitArguments(args)
+    val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
+    assertEqualsWithURLs(
+      Set(jar1, jar2, jar1YarnOpt, jar2YarnOpt), 
conf.get("spark.yarn.dist.jars"))
+    assertEqualsWithFiles(
+      Set(file1, file2, file1YarnOpt, file2YarnOpt), 
conf.get("spark.yarn.dist.files"))
+    assertEqualsWithFiles(
+      Set(pyFile1, pyFile2, pyFile1YarnOpt, pyFile2YarnOpt), 
conf.get("spark.yarn.dist.pyFiles"))
+    assertEqualsWithFiles(Set(archive1, archive2, archive1YarnOpt, 
archive2YarnOpt),
+      conf.get("spark.yarn.dist.archives"))
+  }
+
   // scalastyle:on println
 
   private def checkDownloadedFile(sourcePath: String, outputPath: String): 
Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to