maropu commented on a change in pull request #30486:
URL: https://github.com/apache/spark/pull/30486#discussion_r530054680



##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -422,6 +426,8 @@ class SparkContext(config: SparkConf) extends Logging {
     _jars = Utils.getUserJars(_conf)
     _files = 
_conf.getOption(FILES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
       .toSeq.flatten
+    _archives = 
_conf.getOption(ARCHIVES.key).map(_.split(",")).map(_.filter(_.nonEmpty))
+      .toSeq.flatten

Review comment:
       nit: `_archives = 
_conf.getOption(ARCHIVES.key).map(Utils.stringToSeq).toSeq.flatten`?

##########
File path: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
##########
@@ -512,6 +513,8 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
         |  --files FILES               Comma-separated list of files to be 
placed in the working
         |                              directory of each executor. File paths 
of these files
         |                              in executors can be accessed via 
SparkFiles.get(fileName).
+        |  --archives ARCHIVES         Comma separated list of archives to be 
extracted into the

Review comment:
       nit: `Comma separated` -> `Comma-separated` for consistency?

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1803,6 +1803,16 @@ package object config {
     .toSequence
     .createWithDefault(Nil)
 
+  private[spark] val ARCHIVES = ConfigBuilder("spark.archives")
+    .version("3.1.0")
+    .doc("Comma separated list of archives to be extracted into the working 
directory of each " +

Review comment:
       btw, we don't need to describe something here about the relationship (or 
difference?) between `park.files` and `spark.archives`? 

##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -494,7 +495,8 @@ private[spark] object Utils extends Logging {
       securityMgr: SecurityManager,
       hadoopConf: Configuration,
       timestamp: Long,
-      useCache: Boolean): File = {
+      useCache: Boolean,
+      shouldUntar: Boolean = true): File = {

Review comment:
       In L487, could you document what the new param `shouldUntar` is?

##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -230,16 +232,17 @@ private[spark] class Executor(
   private val appStartTime = conf.getLong("spark.app.startTime", 0)
 
   // To allow users to distribute plugins and their required files
-  // specified by --jars and --files on application submission, those 
jars/files should be
-  // downloaded and added to the class loader via updateDependencies.
-  // This should be done before plugin initialization below
+  // specified by --jars, --files and --archives on application submission, 
those
+  // jars/files/archives should be downloaded and added to the class loader vi

Review comment:
       nit: `vi` -> `via`

##########
File path: core/src/main/scala/org/apache/spark/SparkContext.scala
##########
@@ -1568,21 +1613,38 @@ class SparkContext(config: SparkConf) extends Logging {
 
     val key = if (!isLocal && scheme == "file") {
       env.rpcEnv.fileServer.addFile(new File(uri.getPath))
+    } else if (uri.getScheme == null) {
+      schemeCorrectedURI.toString
+    } else if (isArchive) {
+      uri.toString
     } else {
-        if (uri.getScheme == null) {
-          schemeCorrectedURI.toString
-        } else {
-          path
-        }
+      path
     }
+
     val timestamp = if (addedOnSubmit) startTime else System.currentTimeMillis
-    if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
+    if (!isArchive && addedFiles.putIfAbsent(key, timestamp).isEmpty) {
       logInfo(s"Added file $path at $key with timestamp $timestamp")
       // Fetch the file locally so that closures which are run on the driver 
can still use the
       // SparkFiles API to access files.
       Utils.fetchFile(uri.toString, new File(SparkFiles.getRootDirectory()), 
conf,
         env.securityManager, hadoopConfiguration, timestamp, useCache = false)
       postEnvironmentUpdate()
+    } else if (
+      isArchive &&
+        addedArchives.putIfAbsent(
+          UriBuilder.fromUri(new 
URI(key)).fragment(uri.getFragment).build().toString,
+          timestamp).isEmpty) {
+      logInfo(s"Added archive $path at $key with timestamp $timestamp")
+      val uriToDownload = UriBuilder.fromUri(new 
URI(key)).fragment(null).build()
+      val source = Utils.fetchFile(uriToDownload.toString, 
Utils.createTempDir(), conf,
+        env.securityManager, hadoopConfiguration, timestamp, useCache = false, 
shouldUntar = false)
+      logInfo(s"Unpacking an archive $path")

Review comment:
       How about logging source and dest, too?

##########
File path: core/src/main/scala/org/apache/spark/executor/Executor.scala
##########
@@ -907,32 +911,49 @@ private[spark] class Executor(
    * Download any missing dependencies if we receive a new set of files and 
JARs from the
    * SparkContext. Also adds any new JARs we fetched to the class loader.
    */
-  private def updateDependencies(newFiles: Map[String, Long], newJars: 
Map[String, Long]): Unit = {
+  private def updateDependencies(
+      newFiles: Map[String, Long],
+      newJars: Map[String, Long],
+      newArchives: Map[String, Long]): Unit = {
     lazy val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
     synchronized {
       // Fetch missing dependencies
       for ((name, timestamp) <- newFiles if currentFiles.getOrElse(name, -1L) 
< timestamp) {
-        logInfo("Fetching " + name + " with timestamp " + timestamp)
+        logInfo(s"Fetching $name with timestamp $timestamp")
         // Fetch file with useCache mode, close cache for local mode.
         Utils.fetchFile(name, new File(SparkFiles.getRootDirectory()), conf,
           env.securityManager, hadoopConf, timestamp, useCache = !isLocal)
         currentFiles(name) = timestamp
       }
+      for ((name, timestamp) <- newArchives if currentArchives.getOrElse(name, 
-1L) < timestamp) {
+        logInfo(s"Fetching $name with timestamp $timestamp")
+        val sourceURI = new URI(name)
+        val uriToDownload = 
UriBuilder.fromUri(sourceURI).fragment(null).build()
+        val source = Utils.fetchFile(uriToDownload.toString, 
Utils.createTempDir(), conf,
+          env.securityManager, hadoopConf, timestamp, useCache = false, 
shouldUntar = false)

Review comment:
       We cannot use cache for this purpose?

##########
File path: docs/configuration.md
##########
@@ -784,6 +784,17 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
   <td>2.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.archives</code></td>
+  <td></td>
+  <td>
+    Comma separated list of archives to be extracted into the working 
directory of each executor.

Review comment:
       ditto: `Comma separated` -> `Comma-separated`

##########
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##########
@@ -1803,6 +1803,16 @@ package object config {
     .toSequence
     .createWithDefault(Nil)
 
+  private[spark] val ARCHIVES = ConfigBuilder("spark.archives")
+    .version("3.1.0")
+    .doc("Comma separated list of archives to be extracted into the working 
directory of each " +

Review comment:
       nit: `Comma separated` -> `Comma-separated` for consistency?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to