Repository: spark Updated Branches: refs/heads/branch-2.1 3e139e239 -> 569f77a11
[SPARK-18099][YARN] Fail if same files added to distributed cache for --files and --archives ## What changes were proposed in this pull request? During spark-submit, if yarn dist cache is instructed to add same file under --files and --archives, This code change ensures the spark yarn distributed cache behaviour is retained i.e. to warn and fail if same files is mentioned in both --files and --archives. ## How was this patch tested? Manually tested: 1. if same jar is mentioned in --jars and --files it will continue to submit the job. - basically functionality [SPARK-14423] #12203 is unchanged 1. if same file is mentioned in --files and --archives it will fail to submit the job. Please review https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before opening a pull request. ⦠under archives and files Author: Kishor Patil <kpa...@yahoo-inc.com> Closes #15627 from kishorvpatil/spark18099. (cherry picked from commit 098e4ca9c7af61e64839a50c65be449749af6482) Signed-off-by: Tom Graves <tgra...@yahoo-inc.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/569f77a1 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/569f77a1 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/569f77a1 Branch: refs/heads/branch-2.1 Commit: 569f77a11819523bdf5dc2c6429fc3399cbb6519 Parents: 3e139e2 Author: Kishor Patil <kpa...@yahoo-inc.com> Authored: Thu Nov 3 16:10:26 2016 -0500 Committer: Tom Graves <tgra...@yahoo-inc.com> Committed: Thu Nov 3 16:12:05 2016 -0500 ---------------------------------------------------------------------- .../org/apache/spark/deploy/yarn/Client.scala | 12 +++++- .../apache/spark/deploy/yarn/ClientSuite.scala | 42 ++++++++++++++++++++ 2 files changed, 52 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/569f77a1/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---------------------------------------------------------------------- diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 053a786..172fb46 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -598,8 +598,16 @@ private[spark] class Client( ).foreach { case (flist, resType, addToClasspath) => flist.foreach { file => val (_, localizedPath) = distribute(file, resType = resType) - if (addToClasspath && localizedPath != null) { - cachedSecondaryJarLinks += localizedPath + // If addToClassPath, we ignore adding jar multiple times to distitrbuted cache. + if (addToClasspath) { + if (localizedPath != null) { + cachedSecondaryJarLinks += localizedPath + } + } else { + if (localizedPath != null) { + throw new IllegalArgumentException(s"Attempt to add ($file) multiple times" + + " to the distributed cache.") + } } } } http://git-wip-us.apache.org/repos/asf/spark/blob/569f77a1/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala ---------------------------------------------------------------------- diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 0a4f291..06516c1 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -282,6 +282,48 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll } } + test("distribute archive multiple times") { + val libs = Utils.createTempDir() + // Create jars dir and RELEASE file to avoid IllegalStateException. + val jarsDir = new File(libs, "jars") + assert(jarsDir.mkdir()) + new FileOutputStream(new File(libs, "RELEASE")).close() + + val userLib1 = Utils.createTempDir() + val testJar = TestUtils.createJarWithFiles(Map(), userLib1) + + // Case 1: FILES_TO_DISTRIBUTE and ARCHIVES_TO_DISTRIBUTE can't have duplicate files + val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath)) + .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath)) + + val client = createClient(sparkConf) + val tempDir = Utils.createTempDir() + intercept[IllegalArgumentException] { + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) + } + + // Case 2: FILES_TO_DISTRIBUTE can't have duplicate files. + val sparkConfFiles = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(FILES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) + + val clientFiles = createClient(sparkConfFiles) + val tempDirForFiles = Utils.createTempDir() + intercept[IllegalArgumentException] { + clientFiles.prepareLocalResources(new Path(tempDirForFiles.getAbsolutePath()), Nil) + } + + // Case 3: ARCHIVES_TO_DISTRIBUTE can't have duplicate files. + val sparkConfArchives = new SparkConfWithEnv(Map("SPARK_HOME" -> libs.getAbsolutePath)) + .set(ARCHIVES_TO_DISTRIBUTE, Seq(testJar.getPath, testJar.getPath)) + + val clientArchives = createClient(sparkConfArchives) + val tempDirForArchives = Utils.createTempDir() + intercept[IllegalArgumentException] { + clientArchives.prepareLocalResources(new Path(tempDirForArchives.getAbsolutePath()), Nil) + } + } + test("distribute local spark jars") { val temp = Utils.createTempDir() val jarsDir = new File(temp, "jars") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org