[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20853 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r176154914 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -657,6 +662,30 @@ class SparkSubmitSuite conf3.get(PYSPARK_PYTHON.key) should be ("python3.5") } + test("ambiguous archive mapping results in error message") { +val dir = Utils.createTempDir() +val archive1 = Paths.get(dir.toPath.toString, "first.zip") +val archive2 = Paths.get(dir.toPath.toString, "second.zip") +Files.createFile(archive1) +Files.createFile(archive2) +val jars = "/jar1,/jar2" // --jars +val files = "local:/file1,file2" // --files +// --archives --- End diff -- Unnecessary comment. I know the other test has them, but I'd just remove these from this new code, since they don't add any useful information. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r176152842 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +138,32 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val (base, fragment) = splitOnFragment(path) + (resolveGlobPath(base, hadoopConf), fragment) match { +case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException( +s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}") +case (resolved, Some(namedAs)) => resolved.map( _ + "#" + namedAs) +case (resolved, _) => resolved } }.mkString(",") } + private def splitOnFragment(path: String): (URI, Option[String]) = { +val uri = Utils.resolveURI(path) +val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null) +val fragment = if (uri.getFragment != null) Some(uri.getFragment) else None --- End diff -- `Option(uri.getFragment)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r176155080 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -657,6 +662,30 @@ class SparkSubmitSuite conf3.get(PYSPARK_PYTHON.key) should be ("python3.5") } + test("ambiguous archive mapping results in error message") { +val dir = Utils.createTempDir() +val archive1 = Paths.get(dir.toPath.toString, "first.zip") +val archive2 = Paths.get(dir.toPath.toString, "second.zip") +Files.createFile(archive1) +Files.createFile(archive2) +val jars = "/jar1,/jar2" // --jars +val files = "local:/file1,file2" // --files +// --archives +val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3" +val pyFiles = "py-file1,py-file2"// --py-files + +// Test files and archives (Yarn) --- End diff -- Unnecessary comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175664716 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +138,32 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val (base, fragment) = splitOnFragment(path) + (resolveGlobPath(base, hadoopConf), fragment) match { +case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException( +s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}") --- End diff -- There was no space used here before. Actually there should not be any space in the resulting list. Tests also rely on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175662764 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,17 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => printErrorAndExit(e.getMessage); throw e --- End diff -- nit: ``` case e: SparkException => printErrorAndExit(e.getMessage) throw e ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175662672 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +138,32 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val (base, fragment) = splitOnFragment(path) + (resolveGlobPath(base, hadoopConf), fragment) match { +case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException( +s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}") +case (resolved, Some(namedAs)) => resolved.map( _ + "#" + namedAs) +case (resolved, _) => resolved } }.mkString(",") } + private def splitOnFragment(path: String): (URI, Option[String]) = { +val uri = Utils.resolveURI(path) +val withoutFragment = new URI(uri.getScheme, uri.getSchemeSpecificPart, null) +val fragment = if (uri.getFragment != null) Some(uri.getFragment) else None +(withoutFragment, fragment) + } + + private def resolveGlobPath(uri: URI, hadoopConf: Configuration): Array [String] = { --- End diff -- nit: `Array[String]` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175662574 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +138,32 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val (base, fragment) = splitOnFragment(path) + (resolveGlobPath(base, hadoopConf), fragment) match { +case (resolved, Some(_)) if resolved.length > 1 => throw new SparkException( +s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}") --- End diff -- nit: `resolved.mkString(", ")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175585634 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +138,36 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val (base, fragment) = splitOnFragment(Utils.resolveURI(path)) + (resolveGlobPath(base, hadoopConf), fragment) match { +case (resolved: Array[String], Some(_)) if resolved.length > 1 => throw new SparkException( +s"${base.toString} resolves ambiguously to multiple files: ${resolved.mkString(",")}") +case (resolved: Array[String], Some(namedAs)) => resolved.map( _ + "#" + namedAs) --- End diff -- Same here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175585581 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +138,36 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val (base, fragment) = splitOnFragment(Utils.resolveURI(path)) + (resolveGlobPath(base, hadoopConf), fragment) match { +case (resolved: Array[String], Some(_)) if resolved.length > 1 => throw new SparkException( --- End diff -- Type inference is not working here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175576249 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => +printErrorAndExit(e.getMessage) +throw new RuntimeException("Unreachable production code") --- End diff -- Actually the directory deletion is hooked into JVM shutdown. So I will let this to do the housekeeping for us and will avoid a new field either. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175575718 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +137,29 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val spath = path.split('#') --- End diff -- You are right. It took some time to clone a URI without the fragment part though but next version will include that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175567819 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => +printErrorAndExit(e.getMessage) +throw new RuntimeException("Unreachable production code") --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175554046 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => +printErrorAndExit(e.getMessage) +throw new RuntimeException("Unreachable production code") --- End diff -- Maybe just let the exception propagate? That's what a lot of this code does... then you don't need to change this file at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175552660 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +137,29 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val spath = path.split('#') --- End diff -- Why not use `Utils.resolveURI` as before? Parsing URIs by hand is very sketchy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175553679 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +137,29 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val spath = path.split('#') + val renameAs = if (spath.length > 1) Some(spath(1)) else None + val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) + resolved match { --- End diff -- This whole match block is a little ugly, but I'll wait to see how you implement Gabor's suggestion... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175553185 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -105,11 +105,17 @@ class SparkSubmitSuite // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + var dir: File = null --- End diff -- Yeah, there is a good example here: `test("launch simple application with spark-submit with redaction")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175552387 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -606,9 +612,12 @@ class SparkSubmitSuite } test("resolves command line argument paths correctly") { +val archive = Paths.get(dir.toPath.toString, "single.zip") +Files.createFile(archive) val jars = "/jar1,/jar2" // --jars val files = "local:/file1,file2" // --files -val archives = "file:/archive1,archive2" // --archives +val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3" + // --archives val pyFiles = "py-file1,py-file2"// --py-files --- End diff -- YARN's `Client.scala` supports renaming for everything that uses the distributed cache, even if that's not explicitly called out in the docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175546041 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -105,11 +105,17 @@ class SparkSubmitSuite // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + var dir: File = null --- End diff -- I wanted to make sure the directory is deleted even if the test fails --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175544492 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +137,29 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val spath = path.split('#') + val renameAs = if (spath.length > 1) Some(spath(1)) else None + val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) + resolved match { +case array: Array[String] if !renameAs.isEmpty && array.length>1 => + throw new SparkException( +s"${spath(1)} resolves ambiguously to multiple files: ${array.mkString(",")}") +case array: Array[String] if !renameAs.isEmpty => array.map( _ + "#" + renameAs.get) --- End diff -- Maybe we can find some meaningful name for `array` which makes me hard to read the code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175543984 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => +printErrorAndExit(e.getMessage) +throw new RuntimeException("Unreachable production code") --- End diff -- Otherwise execution just continues in the test itself where `exitFn` does not stop it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175543124 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +137,29 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val spath = path.split('#') + val renameAs = if (spath.length > 1) Some(spath(1)) else None + val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) --- End diff -- Nit: `resolveGlobPath` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175541913 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => +printErrorAndExit(e.getMessage) +throw new RuntimeException("Unreachable production code") --- End diff -- `throw new RuntimeException...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175541331 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => +printErrorAndExit(e.getMessage) +throw new RuntimeException("Unreachable production code") --- End diff -- Which part do you mean and overkill? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175540847 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -606,9 +612,12 @@ class SparkSubmitSuite } test("resolves command line argument paths correctly") { +val archive = Paths.get(dir.toPath.toString, "single.zip") +Files.createFile(archive) val jars = "/jar1,/jar2" // --jars val files = "local:/file1,file2" // --files -val archives = "file:/archive1,archive2" // --archives +val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3" + // --archives val pyFiles = "py-file1,py-file2"// --py-files --- End diff -- OK, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175540696 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -105,11 +105,17 @@ class SparkSubmitSuite // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + var dir: File = null --- End diff -- I mean more like put something here which is used by more than 2 tests. There are ~40 tests which are just creating and deleting this directory without any benefit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175540130 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -606,9 +612,12 @@ class SparkSubmitSuite } test("resolves command line argument paths correctly") { +val archive = Paths.get(dir.toPath.toString, "single.zip") +Files.createFile(archive) val jars = "/jar1,/jar2" // --jars val files = "local:/file1,file2" // --files -val archives = "file:/archive1,archive2" // --archives +val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3" + // --archives val pyFiles = "py-file1,py-file2"// --py-files --- End diff -- According to the [doc](https://spark.apache.org/docs/latest/running-on-yarn.html) only `--files` and `--archives` support it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user misutoth commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175539256 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -105,11 +105,17 @@ class SparkSubmitSuite // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + var dir: File = null --- End diff -- I was thinking about this too. I wanted to avoid making it an Option or doing a not very nice null check. I can do that later though... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175533621 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +137,29 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val spath = path.split('#') + val renameAs = if (spath.length > 1) Some(spath(1)) else None + val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) + resolved match { --- End diff -- This can be simplified something like this: `(renameAs, resolved) match...` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175521372 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -606,9 +612,12 @@ class SparkSubmitSuite } test("resolves command line argument paths correctly") { +val archive = Paths.get(dir.toPath.toString, "single.zip") +Files.createFile(archive) val jars = "/jar1,/jar2" // --jars val files = "local:/file1,file2" // --files -val archives = "file:/archive1,archive2" // --archives +val archives = s"file:/archive1,${dir.toPath.toAbsolutePath.toString}/*.zip#archive3" + // --archives val pyFiles = "py-file1,py-file2"// --py-files --- End diff -- Does `--py-files` support renaming? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175515840 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -105,11 +105,17 @@ class SparkSubmitSuite // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler + var dir: File = null --- End diff -- Most of the tests doesn't use this dir at all. Why create it for all the tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175529369 --- Diff: core/src/main/scala/org/apache/spark/deploy/DependencyUtils.scala --- @@ -137,16 +137,29 @@ private[deploy] object DependencyUtils { def resolveGlobPaths(paths: String, hadoopConf: Configuration): String = { require(paths != null, "paths cannot be null.") Utils.stringToSeq(paths).flatMap { path => - val uri = Utils.resolveURI(path) - uri.getScheme match { -case "local" | "http" | "https" | "ftp" => Array(path) -case _ => - val fs = FileSystem.get(uri, hadoopConf) - Option(fs.globStatus(new Path(uri))).map { status => -status.filter(_.isFile).map(_.getPath.toUri.toString) - }.getOrElse(Array(path)) + val spath = path.split('#') + val renameAs = if (spath.length > 1) Some(spath(1)) else None + val resolved: Array[String] = resoloveGlobPath(spath(0), hadoopConf) + resolved match { +case array: Array[String] if !renameAs.isEmpty && array.length>1 => --- End diff -- Nit: `array.length > 1` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175521925 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -657,6 +667,31 @@ class SparkSubmitSuite conf3.get(PYSPARK_PYTHON.key) should be ("python3.5") } + var cleanExit = false --- End diff -- What is it used for? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20853: [SPARK-23729][CORE] Respect URI fragment when res...
Github user gaborgsomogyi commented on a diff in the pull request: https://github.com/apache/spark/pull/20853#discussion_r175523620 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -245,6 +245,19 @@ object SparkSubmit extends CommandLineUtils with Logging { args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { +try { + doPrepareSubmitEnvironment(args, conf) +} catch { + case e: SparkException => +printErrorAndExit(e.getMessage) +throw new RuntimeException("Unreachable production code") --- End diff -- Nit: I have a feeling it's a bit overkill compared to the other occurences. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org