xkrogen commented on a change in pull request #29966:
URL: https://github.com/apache/spark/pull/29966#discussion_r533573706



##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,112 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+    Seq(
+      "spark.jars.excludes",
+      "spark.jars.packages",
+      "spark.jars.repositories",
+      "spark.jars.ivy",
+      "spark.jars.ivySettings"
+    ).map(sys.props.get(_).orNull)
+  }
+
+  private def parseQueryParams(uriQuery: String): (Boolean, String) = {

Review comment:
       can we document the fields of the return tuple

##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,112 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+    Seq(
+      "spark.jars.excludes",
+      "spark.jars.packages",
+      "spark.jars.repositories",
+      "spark.jars.ivy",
+      "spark.jars.ivySettings"
+    ).map(sys.props.get(_).orNull)
+  }
+
+  private def parseQueryParams(uriQuery: String): (Boolean, String) = {
+    if (uriQuery == null) {
+      (false, "")
+    } else {
+      val mapTokens = uriQuery.split("&").map(_.split("="))
+      if (mapTokens.exists(_.length != 2)) {
+        throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+      }
+      val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+      // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+      var transitive = false

Review comment:
       To be a bit more functional / Scala-idiomatic, I think we can do 
something like 
`groupedParams.get("transitive").takeRight(1).headOption.getOrElse(false)` 
instead of the `foreach` call

##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,112 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+    Seq(
+      "spark.jars.excludes",
+      "spark.jars.packages",
+      "spark.jars.repositories",
+      "spark.jars.ivy",
+      "spark.jars.ivySettings"
+    ).map(sys.props.get(_).orNull)
+  }
+
+  private def parseQueryParams(uriQuery: String): (Boolean, String) = {
+    if (uriQuery == null) {
+      (false, "")
+    } else {
+      val mapTokens = uriQuery.split("&").map(_.split("="))
+      if (mapTokens.exists(_.length != 2)) {
+        throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+      }
+      val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+      // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+      var transitive = false
+      groupedParams.get("transitive").foreach { params =>
+        if (params.length > 1) {
+          logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+            " If there are multiple `transitive` parameter, we will select the 
last one")
+        }
+        params.map(_._2).foreach {
+          case "true" => transitive = true
+          case _ => transitive = false
+        }
+      }
+      // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+      // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+      // in a excluded list.
+      val exclusionList = groupedParams.get("exclude").map { params =>
+        params.flatMap { case (_, excludeString) =>
+          val excludes = excludeString.split(",")
+          if (excludes.exists(_.split(":").length != 2)) {

Review comment:
       Similar comment as `mapTokens`, verify that the none of the elements of 
the split result are empty 

##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,112 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+    Seq(
+      "spark.jars.excludes",
+      "spark.jars.packages",
+      "spark.jars.repositories",
+      "spark.jars.ivy",
+      "spark.jars.ivySettings"
+    ).map(sys.props.get(_).orNull)
+  }
+
+  private def parseQueryParams(uriQuery: String): (Boolean, String) = {
+    if (uriQuery == null) {
+      (false, "")
+    } else {
+      val mapTokens = uriQuery.split("&").map(_.split("="))
+      if (mapTokens.exists(_.length != 2)) {

Review comment:
       maybe
   ```
   mapTokens.exists(token => token.length != 2 || StringUtils.isBlank(token(0) 
|| StringUtils.isBlank(token(1))
   ```
   to validate that the tokens are valid?

##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,112 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+    Seq(
+      "spark.jars.excludes",
+      "spark.jars.packages",
+      "spark.jars.repositories",
+      "spark.jars.ivy",
+      "spark.jars.ivySettings"
+    ).map(sys.props.get(_).orNull)
+  }
+
+  private def parseQueryParams(uriQuery: String): (Boolean, String) = {
+    if (uriQuery == null) {
+      (false, "")
+    } else {
+      val mapTokens = uriQuery.split("&").map(_.split("="))
+      if (mapTokens.exists(_.length != 2)) {
+        throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+      }
+      val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+      // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+      var transitive = false
+      groupedParams.get("transitive").foreach { params =>
+        if (params.length > 1) {
+          logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+            " If there are multiple `transitive` parameter, we will select the 
last one")
+        }
+        params.map(_._2).foreach {
+          case "true" => transitive = true
+          case _ => transitive = false
+        }
+      }
+      // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+      // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+      // in a excluded list.
+      val exclusionList = groupedParams.get("exclude").map { params =>
+        params.flatMap { case (_, excludeString) =>

Review comment:
       Maybe `params.map(_._2).flatMap { excludeString => ` ? I think it's a 
little more readable than the extraction, but either seems fine.

##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,115 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+    Seq(
+      "spark.jars.excludes",
+      "spark.jars.packages",
+      "spark.jars.repositories",
+      "spark.jars.ivy",
+      "spark.jars.ivySettings"
+    ).map(sys.props.get(_).orNull)
+  }
+
+  def parseQueryParams(uriQuery: String): (Boolean, String) = {
+    if (uriQuery == null) {
+      (false, "")
+    } else {
+      val mapTokens = uriQuery.split("&").map(_.split("="))
+      if (mapTokens.exists(_.length != 2)) {
+        throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+      }
+      val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+      // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+      var transitive = false
+      groupedParams.get("transitive").foreach { params =>
+        if (params.length > 1) {
+          logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+            " If there are multiple `transitive` parameter, we will select the 
last one")
+        }
+        params.map(_._2).foreach(value => {
+          if (value == "true") {
+            transitive = true
+          } else if (value == "false") {
+            transitive = false
+          }
+        })
+      }
+      // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+      // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+      // in a excluded list.
+      val exclusionList = groupedParams.get("exclude").map { params =>
+        params.flatMap { case (_, excludeString) =>
+          val excludes = excludeString.split(",")
+          if (excludes.exists(_.split(":").length != 2)) {
+            throw new URISyntaxException(excludeString, "Invalid exclude 
string: " +
+              "expected 'org:module,org:module,..', found " + excludeString)
+          }
+          excludes
+        }.mkString(",")
+      }.getOrElse("")
+
+      (transitive, exclusionList)

Review comment:
       Seems like we should at least warn on invalid param?

##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,112 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {
+    Seq(
+      "spark.jars.excludes",
+      "spark.jars.packages",
+      "spark.jars.repositories",
+      "spark.jars.ivy",
+      "spark.jars.ivySettings"
+    ).map(sys.props.get(_).orNull)
+  }
+
+  private def parseQueryParams(uriQuery: String): (Boolean, String) = {
+    if (uriQuery == null) {
+      (false, "")
+    } else {
+      val mapTokens = uriQuery.split("&").map(_.split("="))
+      if (mapTokens.exists(_.length != 2)) {
+        throw new URISyntaxException(uriQuery, s"Invalid query string: 
$uriQuery")
+      }
+      val groupedParams = mapTokens.map(kv => (kv(0), kv(1))).groupBy(_._1)
+      // Parse transitive parameters (e.g., transitive=true) in an ivy URL, 
default value is false
+      var transitive = false
+      groupedParams.get("transitive").foreach { params =>
+        if (params.length > 1) {
+          logWarning("It's best to specify `transitive` parameter in ivy URL 
query only once." +
+            " If there are multiple `transitive` parameter, we will select the 
last one")
+        }
+        params.map(_._2).foreach {
+          case "true" => transitive = true
+          case _ => transitive = false
+        }
+      }
+      // Parse an excluded list (e.g., 
exclude=org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http)
+      // in an ivy URL. When download ivy URL jar, Spark won't download 
transitive jar
+      // in a excluded list.
+      val exclusionList = groupedParams.get("exclude").map { params =>
+        params.flatMap { case (_, excludeString) =>
+          val excludes = excludeString.split(",")
+          if (excludes.exists(_.split(":").length != 2)) {
+            throw new URISyntaxException(excludeString, "Invalid exclude 
string: " +
+              "expected 'org:module,org:module,..', found " + excludeString)
+          }
+          excludes
+        }.mkString(",")
+      }.getOrElse("")
+
+      (transitive, exclusionList)
+    }
+  }
+
+  /**
+   * Download Ivy URIs dependency jars.
+   *
+   * @param uri Ivy uri need to be downloaded. The URI format should be:
+   *              `ivy://group:module:version[?query]`
+   *            Ivy URI query part format should be:
+   *              `parameter=value&parameter=value...`
+   *            Note that currently ivy URI query part support two parameters:
+   *             1. transitive: whether to download dependent jars related to 
your ivy URL.
+   *                transitive=false or `transitive=true`, if not set, the 
default value is false.
+   *             2. exclude: exclusion list when download ivy URL jar and 
dependency jars.
+   *                The `exclude` parameter content is a ',' separated 
`group:module` pair string :
+   *                `exclude=group:module,group:module...`
+   * @return Comma separated string list of URIs of downloaded jars
+   */
+  def resolveMavenDependencies(uri: URI): Seq[String] = {
+    val Seq(_, _, repositories, ivyRepoPath, ivySettingsPath) =
+      DependencyUtils.getIvyProperties()
+    val authority = uri.getAuthority
+    if (authority == null) {
+      throw new URISyntaxException(
+        authority, "Invalid url: Expected 'org:module:version', found null")

Review comment:
       Maybe we should include the whole URI here instead of just `authority`? 
This could lead to more helpful messages if you accidentally mis-format like 
missing a slash

##########
File path: core/src/main/scala/org/apache/spark/util/DependencyUtils.scala
##########
@@ -15,22 +15,112 @@
  * limitations under the License.
  */
 
-package org.apache.spark.deploy
+package org.apache.spark.util
 
 import java.io.File
-import java.net.URI
+import java.net.{URI, URISyntaxException}
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.SparkSubmitUtils
 import org.apache.spark.internal.Logging
-import org.apache.spark.util.{MutableURLClassLoader, Utils}
 
-private[deploy] object DependencyUtils extends Logging {
+private[spark] object DependencyUtils extends Logging {
+
+  def getIvyProperties(): Seq[String] = {

Review comment:
       `Seq[String]` seems like an odd return value for this. Maybe a tuple 
would be more appropriate? Or even a case class? For either seq or tuple I 
think we need to add some Scaladoc to tell callers how to unpack it.
   
   Also I wonder if we should be returning `Option[String]` instead of `null` 
for missing values?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to