[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19130 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139609285 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -385,4 +385,14 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val FORCE_DOWNLOAD_SCHEMES = +ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") + .doc("Comma-separated list of schemes for which files will be downloaded to the " + +"local disk prior to being added to YARN's distributed cache. For use in cases " + +"where the YARN service does not support schemes that are supported by Spark, like http, " + +"https, ftp.") --- End diff -- ah got it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139608374 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -385,4 +385,14 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val FORCE_DOWNLOAD_SCHEMES = +ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") + .doc("Comma-separated list of schemes for which files will be downloaded to the " + +"local disk prior to being added to YARN's distributed cache. For use in cases " + +"where the YARN service does not support schemes that are supported by Spark, like http, " + +"https, ftp.") --- End diff -- It is not required, we still want to leverage Hadoop's http(s) FS to distribute resources by default if it is supported in Hadoop 2.9+ (https://issues.apache.org/jira/browse/HADOOP-14383) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139607663 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -385,4 +385,14 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val FORCE_DOWNLOAD_SCHEMES = +ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") + .doc("Comma-separated list of schemes for which files will be downloaded to the " + +"local disk prior to being added to YARN's distributed cache. For use in cases " + +"where the YARN service does not support schemes that are supported by Spark, like http, " + +"https, ftp.") --- End diff -- shall we make these 3 the default value of this config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139607620 --- Diff: docs/running-on-yarn.md --- @@ -212,6 +212,15 @@ To use a custom metrics.properties for the application master and executors, upd + spark.yarn.dist.forceDownloadSchemes + (none) + +Comma-separated list of schemes for which files will be downloaded to the local disk prior to +being added to YARN's distributed cache. For use in cases where the YARN service does not +support schemes that are supported by Spark. --- End diff -- update here too --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139577257 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -385,4 +385,13 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val FORCE_DOWNLOAD_SCHEMES = +ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") + .doc("Comma-separated list of schemes for which files will be downloaded to the " + +"local disk prior to being added to YARN's distributed cache. For use in cases " + +"where the YARN service does not support schemes that are supported by Spark.") --- End diff -- OK, sure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139577191 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: +// 1. Hadoop FileSystem doesn't support them. +// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". +// We will download them to local disk prior to add to YARN's distributed cache. +// For yarn client mode, since we already download them with above code, so we only need to +// figure out the local path and replace the remote one. +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = () => { + Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isSuccess +} +forceDownloadSchemes.contains(scheme) || !isFsAvailable() + } + + def downloadResource(resource: String): String = { +val uri = Utils.resolveURI(resource) +uri.getScheme match { + case "local" | "file" => resource + case e if shouldDownload(e) => --- End diff -- No, it is not required, because `shouldDownload` logic will handle this. If 1) this resource scheme is blacklisted, or 2) it is not support by Hadoop, then Spark will handle this through `downloadFile` method. Since "http" | "https" | "ftp" is not supported by Hadoop before 2.9, so it implies that resources with such scheme will be handled by Spark itself. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139576893 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: +// 1. Hadoop FileSystem doesn't support them. +// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". +// We will download them to local disk prior to add to YARN's distributed cache. +// For yarn client mode, since we already download them with above code, so we only need to +// figure out the local path and replace the remote one. +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = () => { --- End diff -- It can be, let me change the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139576814 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: --- End diff -- This is a problem for YARN currently, because YARN uses dist cache to distribute resources to yarn cluster, dist cache requires supported Hadoop FS to copy resources, if our resource scheme is http, it will try to find http FS to handle such resource, which will be failed since no http FS supported in current Hadoop. In standalone and Mesos cluster, we use Spark's internal logic to handle http resources, this logic handles well for the http(s) resources, so there should be no issue for standalone and mesos mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139576228 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -385,4 +385,13 @@ package object config { .checkValue(v => v > 0 && v <= Int.MaxValue, s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.") .createWithDefault(1024 * 1024) + + private[spark] val FORCE_DOWNLOAD_SCHEMES = +ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") + .doc("Comma-separated list of schemes for which files will be downloaded to the " + +"local disk prior to being added to YARN's distributed cache. For use in cases " + +"where the YARN service does not support schemes that are supported by Spark.") --- End diff -- can you give an example of these schemes? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139576095 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: +// 1. Hadoop FileSystem doesn't support them. +// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". +// We will download them to local disk prior to add to YARN's distributed cache. +// For yarn client mode, since we already download them with above code, so we only need to +// figure out the local path and replace the remote one. +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = () => { + Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isSuccess +} +forceDownloadSchemes.contains(scheme) || !isFsAvailable() + } + + def downloadResource(resource: String): String = { +val uri = Utils.resolveURI(resource) +uri.getScheme match { + case "local" | "file" => resource + case e if shouldDownload(e) => --- End diff -- shall we explicitly list "http" | "https" | "ftp"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139575943 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: +// 1. Hadoop FileSystem doesn't support them. +// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". +// We will download them to local disk prior to add to YARN's distributed cache. +// For yarn client mode, since we already download them with above code, so we only need to +// figure out the local path and replace the remote one. +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = () => { --- End diff -- why make it a function? Can't we just inline it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139575421 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: --- End diff -- is it a problem only for YARN? Do standalone and Mesos have this problem? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139053961 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: +// 1. Hadoop FileSystem doesn't support them. +// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". +// We will download them to local disk prior to add to YARN's distributed cache. +// For yarn client mode, since we already download them with above code, so we only need to +// figure out the local path and replace the remote one. +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = () => { + Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isSuccess +} +forceDownloadSchemes.contains(scheme) || !isFsAvailable() + } + + def downloadResource(resource: String): String = { +val uri = Utils.resolveURI(resource) +uri.getScheme match { + case "local" | "file" => resource + case e if shouldDownload(e) => +val file = new File(targetDir, new Path(uri).getName) +if (file.exists()) { + file.toURI.toString +} else { + downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) +} + case _ => uri.toString +} + } + + args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull + args.files = Option(args.files).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") --- End diff -- I added a help method in `Utils` and changed in `SparkSubmit` related codes. There still have some other places which requires to change, but I will not touch them in this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r139041065 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,76 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("download remote resource if it is not supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = false) + } + + test("avoid downloading remote resource if it is supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true) + } + + test("force downloading remote resource if it's scheme is configured") { +testRemoteResources(isHttpSchemeBlacklisted = true, supportMockHttpFs = true) + } + + private def testRemoteResources( + isHttpSchemeBlacklisted: Boolean, supportMockHttpFs: Boolean): Unit = { +val hadoopConf = new Configuration() +updateConfWithFakeS3Fs(hadoopConf) +if (supportMockHttpFs) { + hadoopConf.set("fs.http.impl", classOf[TestFileSystem].getCanonicalName) + hadoopConf.set("fs.http.impl.disable.cache", "true") +} + +val tmpDir = Utils.createTempDir() +val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) +val tmpS3Jar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) +val tmpS3JarPath = s"s3a://${new File(tmpS3Jar.toURI).getAbsolutePath}" +val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) +val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}" + +val args = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", s"$tmpS3JarPath,$tmpHttpJarPath", + s"s3a://$mainResource" +) ++ ( + if (isHttpSchemeBlacklisted) { +Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http,https") + } else { +Nil + } +) + +sys.props.put("spark.testing", "1") --- End diff -- I haven't tried, I saw some UT also set this configuration, let me check if it is explicitly required or not. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138980102 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,54 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN, for some remote resources with scheme: +// 1. Hadoop FileSystem doesn't support them. +// 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". +// We will download them to local disk prior to add to YARN's distributed cache. +// For yarn client mode, since we already download them with above code, so we only need to +// figure out the local path and replace the remote one. +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = () => { + Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isSuccess +} +forceDownloadSchemes.contains(scheme) || !isFsAvailable() + } + + def downloadResource(resource: String): String = { +val uri = Utils.resolveURI(resource) +uri.getScheme match { + case "local" | "file" => resource + case e if shouldDownload(e) => +val file = new File(targetDir, new Path(uri).getName) +if (file.exists()) { + file.toURI.toString +} else { + downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) +} + case _ => uri.toString +} + } + + args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull + args.files = Option(args.files).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") --- End diff -- Code like this (break a comma-separate string into a list) is copy & pasted in so many places that it probably deserves a method in `Utils`. There's one in `ConfigHelpers.stringToSeq` but that class is private to its package. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138981630 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,76 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("download remote resource if it is not supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = false) + } + + test("avoid downloading remote resource if it is supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true) + } + + test("force downloading remote resource if it's scheme is configured") { +testRemoteResources(isHttpSchemeBlacklisted = true, supportMockHttpFs = true) + } + + private def testRemoteResources( + isHttpSchemeBlacklisted: Boolean, supportMockHttpFs: Boolean): Unit = { +val hadoopConf = new Configuration() +updateConfWithFakeS3Fs(hadoopConf) +if (supportMockHttpFs) { + hadoopConf.set("fs.http.impl", classOf[TestFileSystem].getCanonicalName) + hadoopConf.set("fs.http.impl.disable.cache", "true") +} + +val tmpDir = Utils.createTempDir() +val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) +val tmpS3Jar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) +val tmpS3JarPath = s"s3a://${new File(tmpS3Jar.toURI).getAbsolutePath}" +val tmpHttpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) +val tmpHttpJarPath = s"http://${new File(tmpHttpJar.toURI).getAbsolutePath}" + +val args = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", s"$tmpS3JarPath,$tmpHttpJarPath", + s"s3a://$mainResource" +) ++ ( + if (isHttpSchemeBlacklisted) { +Seq("--conf", "spark.yarn.dist.forceDownloadSchemes=http,https") + } else { +Nil + } +) + +sys.props.put("spark.testing", "1") --- End diff -- This should already be set by the build scripts, was it not working for you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138980897 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,76 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("download remote resource if it is not supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = false) + } + + test("avoid downloading remote resource if it is supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true) + } + + test("force downloading remote resource if it's scheme is configured") { +testRemoteResources(isHttpSchemeBlacklisted = true, supportMockHttpFs = true) + } + + private def testRemoteResources( + isHttpSchemeBlacklisted: Boolean, supportMockHttpFs: Boolean): Unit = { --- End diff -- break multi-line args one per line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138981253 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,76 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("download remote resource if it is not supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = false) + } + + test("avoid downloading remote resource if it is supported by yarn service") { +testRemoteResources(isHttpSchemeBlacklisted = false, supportMockHttpFs = true) + } + + test("force downloading remote resource if it's scheme is configured") { --- End diff -- Better: "force download from blacklisted schemes" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138978855 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -123,6 +123,11 @@ private[deploy] object DependencyUtils { uri.getScheme match { case "file" | "local" => path + case "http" | "https" | "ftp" if Utils.isTesting => --- End diff -- Sounds good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138827462 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -123,6 +123,11 @@ private[deploy] object DependencyUtils { uri.getScheme match { case "file" | "local" => path + case "http" | "https" | "ftp" if Utils.isTesting => --- End diff -- @vanzin , it is a little difficult to mock the download behavior, so here I check if "spark.testing" is configured, return a dummy local path if it is configured. What do you think about this approach? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138801682 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,80 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("handle remote http(s) resources in yarn mode") { +val hadoopConf = new Configuration() +updateConfWithFakeS3Fs(hadoopConf) + +val tmpDir = Utils.createTempDir() +val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) +val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) +val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}" +// This assumes UT environment could access external network. --- End diff -- Yes, that's my concern, let me think out another way to handle this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138801550 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN cluster manager, +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + // Check the scheme list provided by "spark.yarn.dist.forceDownloadSchemes" to see if current + // resource's scheme is included in this list, or Hadoop FileSystem doesn't support current + // scheme, if so Spark will download the resources to local disk and upload to Hadoop FS. + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme, hadoopConf) } + .map(_ => true).getOrElse(false) +forceDownloadSchemes.contains(scheme) || !isFsAvailable + } + + def downloadResource(resource: String): String = { +val uri = Utils.resolveURI(resource) +uri.getScheme match { + case "local" | "file" => resource + case e if shouldDownload(e) => +if (deployMode == CLIENT) { + // In client mode, we already download the resources, so figuring out the local one + // should be enough. + val fileName = new Path(uri).getName + new File(targetDir, fileName).toURI.toString +} else { + downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) +} + case _ => uri.toString +} + } + + args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull + args.files = Option(args.files).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.pyFiles = Option(args.pyFiles).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.jars = Option(args.jars).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.archives = Option(args.archives).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull --- End diff -- From the code `--files` and `--jars` overwrite `spark.yarn.*` long ago AFAIK. What I think is that we should make `spark.yarn.*` as an internal configurations to reduce the discrepancy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138791246 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN cluster manager, --- End diff -- Sorry for the broken comment, my bad, I will fix it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138694227 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,80 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("handle remote http(s) resources in yarn mode") { +val hadoopConf = new Configuration() +updateConfWithFakeS3Fs(hadoopConf) + +val tmpDir = Utils.createTempDir() +val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) +val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) +val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}" +// This assumes UT environment could access external network. --- End diff -- It would be better if tests could avoid this... you could start a local http server, but that feels like a lot of work. Is there some way to mock the behavior instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138692611 --- Diff: docs/running-on-yarn.md --- @@ -212,6 +212,14 @@ To use a custom metrics.properties for the application master and executors, upd + spark.yarn.dist.forceDownloadSchemes + (none) + +Comma-separated schemes in which remote resources have to download to local disk and upload --- End diff -- Better wording: Comma-separated list of schemes for which files will be downloaded to the local disk prior to being added to YARN's distributed cache. For use in cases where the YARN service does not support schemes that are supported by Spark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138689342 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN cluster manager, --- End diff -- "When running in YARN cluster manager, ?" --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138694708 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,80 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("handle remote http(s) resources in yarn mode") { --- End diff -- It seems you have 3 different tests in this block (at least), could you break them into separate tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138689976 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN cluster manager, +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + // Check the scheme list provided by "spark.yarn.dist.forceDownloadSchemes" to see if current + // resource's scheme is included in this list, or Hadoop FileSystem doesn't support current + // scheme, if so Spark will download the resources to local disk and upload to Hadoop FS. + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme, hadoopConf) } + .map(_ => true).getOrElse(false) --- End diff -- `Try { ... }.isSuccess`? You could also avoid this call if the scheme is in the blacklist. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138694417 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -897,6 +897,80 @@ class SparkSubmitSuite sysProps("spark.submit.pyFiles") should (startWith("/")) } + test("handle remote http(s) resources in yarn mode") { +val hadoopConf = new Configuration() +updateConfWithFakeS3Fs(hadoopConf) + +val tmpDir = Utils.createTempDir() +val mainResource = File.createTempFile("tmpPy", ".py", tmpDir) +val tmpJar = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpDir) +val tmpJarPath = s"s3a://${new File(tmpJar.toURI).getAbsolutePath}" +// This assumes UT environment could access external network. +val remoteHttpJar = + "http://central.maven.org/maven2/io/dropwizard/metrics/metrics-core/; + +"3.2.4/metrics-core-3.2.4.jar" + +val args = Seq( + "--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"), + "--name", "testApp", + "--master", "yarn", + "--deploy-mode", "client", + "--jars", s"$tmpJarPath,$remoteHttpJar", + s"s3a://$mainResource" +) + +val appArgs = new SparkSubmitArguments(args) +val sysProps = SparkSubmit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))._3 + +// Resources in S3 should still be remote path, but remote http resource will be downloaded --- End diff -- ...still are... Also I'm not sure I understand the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138693449 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,53 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// When running in YARN cluster manager, +if (clusterManager == YARN) { + sparkConf.setIfMissing(SecurityManager.SPARK_AUTH_SECRET_CONF, "unused") + val secMgr = new SecurityManager(sparkConf) + val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) + + // Check the scheme list provided by "spark.yarn.dist.forceDownloadSchemes" to see if current + // resource's scheme is included in this list, or Hadoop FileSystem doesn't support current + // scheme, if so Spark will download the resources to local disk and upload to Hadoop FS. + def shouldDownload(scheme: String): Boolean = { +val isFsAvailable = Try { FileSystem.getFileSystemClass(scheme, hadoopConf) } + .map(_ => true).getOrElse(false) +forceDownloadSchemes.contains(scheme) || !isFsAvailable + } + + def downloadResource(resource: String): String = { +val uri = Utils.resolveURI(resource) +uri.getScheme match { + case "local" | "file" => resource + case e if shouldDownload(e) => +if (deployMode == CLIENT) { + // In client mode, we already download the resources, so figuring out the local one + // should be enough. + val fileName = new Path(uri).getName + new File(targetDir, fileName).toURI.toString +} else { + downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) +} + case _ => uri.toString +} + } + + args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull + args.files = Option(args.files).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.pyFiles = Option(args.pyFiles).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.jars = Option(args.jars).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull + args.archives = Option(args.archives).map { files => +files.split(",").map(_.trim).filter(_.nonEmpty).map { downloadResource }.mkString(",") + }.orNull --- End diff -- I was going to say this is missing `spark.yarn.dist.files` and `.jars`, but later those properties seem to be set based on `args.files` and `args.jars`. Which kinda raises the question of what happens when the user sets both. From the documentation it sounds like that should work (both sets of files get added), but from the code it seems `--files` and `--jars` would overwrite the `spark.yarn.*` configs... In any case, that's not the fault of your change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138154503 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// Using a dummy http URI to check if HTTP(s) FileSystem is available, it returns true in +// Hadoop 2.9+, otherwise it returns false. +val isHttpFsAvailable = Try { FileSystem.get(Utils.resolveURI("http://foo/bar;), hadoopConf) } + .map(_ => true) + .getOrElse(false) +// When running in YARN cluster manager, we check the configuration +// "spark.yarn.dist.forceDownloadResources", if true we always download remote HTTP(s) +// resources to local and then re-upload them to Hadoop FS, if false we need to check the +// availability of HTTP(s) FileSystem to decide wether to use HTTP(s) FS to handle resources +// or not. +if (clusterManager == YARN && (sparkConf.get(FORCE_DOWNLOAD_RESOURCES) || !isHttpFsAvailable)) { --- End diff -- do we somehow want to make this configurable per scheme? Right now its basically http/https, in the future would we want to possibly handle other filesystems that hadoop doesn't support. Making this a settable config would make that easier --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19130: [SPARK-21917][CORE][YARN] Supporting adding http(...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/19130#discussion_r138151499 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -367,6 +368,52 @@ object SparkSubmit extends CommandLineUtils with Logging { }.orNull } +// Using a dummy http URI to check if HTTP(s) FileSystem is available, it returns true in +// Hadoop 2.9+, otherwise it returns false. +val isHttpFsAvailable = Try { FileSystem.get(Utils.resolveURI("http://foo/bar;), hadoopConf) } --- End diff -- There is a Filesystem.getFileSystemClass function we could use here instead of calling dummy uri --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org