This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 83f753e  [SPARK-34472][YARN] Ship ivySettings file to driver in 
cluster mode
83f753e is described below

commit 83f753e4e1412a896243f4016600552c0110c1b0
Author: Shardul Mahadik <smaha...@linkedin.com>
AuthorDate: Tue Apr 20 13:35:57 2021 -0500

    [SPARK-34472][YARN] Ship ivySettings file to driver in cluster mode
    
    ### What changes were proposed in this pull request?
    
    In YARN, ship the `spark.jars.ivySettings` file to the driver when using 
`cluster` deploy mode so that `addJar` is able to find it in order to resolve 
ivy paths.
    
    ### Why are the changes needed?
    
    SPARK-33084 introduced support for Ivy paths in `sc.addJar` or Spark SQL 
`ADD JAR`. If we use a custom ivySettings file using `spark.jars.ivySettings`, 
it is loaded at 
https://github.com/apache/spark/blob/b26e7b510bbaee63c4095ab47e75ff2a70e377d7/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L1280.
 However, this file is only accessible on the client machine. In YARN cluster 
mode, this file is not available on the driver and so `addJar` fails to find it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit tests to verify that the `ivySettings` file is localized by the 
YARN client and that a YARN cluster mode application is able to find to load 
the `ivySettings` file.
    
    Closes #31591 from shardulm94/SPARK-34472.
    
    Authored-by: Shardul Mahadik <smaha...@linkedin.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../org/apache/spark/deploy/SparkSubmit.scala      |   7 +-
 docs/configuration.md                              |   7 +-
 .../org/apache/spark/deploy/yarn/Client.scala      |  57 +++++++++---
 .../spark/deploy/yarn/YarnClusterSuite.scala       | 103 +++++++++++++++++++++
 4 files changed, 161 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 0e31fcf..36873c7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -1286,7 +1286,12 @@ private[spark] object SparkSubmitUtils extends Logging {
       settingsFile: String,
       remoteRepos: Option[String],
       ivyPath: Option[String]): IvySettings = {
-    val file = new File(settingsFile)
+    val uri = new URI(settingsFile)
+    val file = Option(uri.getScheme).getOrElse("file") match {
+      case "file" => new File(uri.getPath)
+      case scheme => throw new IllegalArgumentException(s"Scheme $scheme not 
supported in " +
+        "spark.jars.ivySettings")
+    }
     require(file.exists(), s"Ivy settings file $file does not exist")
     require(file.isFile(), s"Ivy settings file $file is not a normal file")
     val ivySettings: IvySettings = new IvySettings
diff --git a/docs/configuration.md b/docs/configuration.md
index d9bbddc..c6b462a 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -771,7 +771,12 @@ Apart from these, the following properties are also 
available, and may be useful
     option <code>--repositories</code> or <code>spark.jars.repositories</code> 
will also be included.
     Useful for allowing Spark to resolve artifacts from behind a firewall e.g. 
via an in-house
     artifact server like Artifactory. Details on the settings file format can 
be
-    found at <a 
href="http://ant.apache.org/ivy/history/latest-milestone/settings.html";>Settings
 Files</a>
+    found at <a 
href="http://ant.apache.org/ivy/history/latest-milestone/settings.html";>Settings
 Files</a>.
+    Only paths with <code>file://</code> scheme are supported. Paths without a 
scheme are assumed to have
+    a <code>file://</code> scheme.
+    <p/>
+    When running in YARN cluster mode, this file will also be localized to the 
remote driver for dependency
+    resolution within <code>SparkContext#addJar</code>
   </td>
   <td>2.2.0</td>
 </tr>
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 618faef..427202f 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -21,6 +21,7 @@ import java.io.{FileSystem => _, _}
 import java.net.{InetAddress, UnknownHostException, URI}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
+import java.nio.file.Files
 import java.util.{Locale, Properties, UUID}
 import java.util.zip.{ZipEntry, ZipOutputStream}
 
@@ -30,7 +31,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, 
HashSet, ListBuffer, Map}
 import scala.util.control.NonFatal
 
 import com.google.common.base.Objects
-import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs._
 import org.apache.hadoop.fs.permission.FsPermission
@@ -518,6 +518,32 @@ private[spark] class Client(
       require(localizedPath != null, "Keytab file already distributed.")
     }
 
+    // If we passed in a ivySettings file, make sure we copy the file to the 
distributed cache
+    // in cluster mode so that the driver can access it
+    val ivySettings = sparkConf.getOption("spark.jars.ivySettings")
+    val ivySettingsLocalizedPath: Option[String] = ivySettings match {
+      case Some(ivySettingsPath) if isClusterMode =>
+        val uri = new URI(ivySettingsPath)
+        Option(uri.getScheme).getOrElse("file") match {
+          case "file" =>
+            val ivySettingsFile = new File(uri.getPath)
+            require(ivySettingsFile.exists(), s"Ivy settings file 
$ivySettingsFile not found")
+            require(ivySettingsFile.isFile(), s"Ivy settings file 
$ivySettingsFile is not a" +
+              "normal file")
+            // Generate a file name that can be used for the ivySettings file, 
that does not
+            // conflict with any user file.
+            val localizedFileName = Some(ivySettingsFile.getName() + "-" +
+              UUID.randomUUID().toString)
+            val (_, localizedPath) = distribute(ivySettingsPath, destName = 
localizedFileName)
+            require(localizedPath != null, "IvySettings file already 
distributed.")
+            Some(localizedPath)
+          case scheme =>
+            throw new IllegalArgumentException(s"Scheme $scheme not supported 
in " +
+              "spark.jars.ivySettings")
+        }
+      case _ => None
+    }
+
     /**
      * Add Spark to the cache. There are two settings that control what files 
to add to the cache:
      * - if a Spark archive is defined, use the archive. The archive is 
expected to contain
@@ -576,7 +602,7 @@ private[spark] class Client(
             jarsDir.listFiles().foreach { f =>
               if (f.isFile && 
f.getName.toLowerCase(Locale.ROOT).endsWith(".jar") && f.canRead) {
                 jarsStream.putNextEntry(new ZipEntry(f.getName))
-                Files.copy(f, jarsStream)
+                Files.copy(f.toPath, jarsStream)
                 jarsStream.closeEntry()
               }
             }
@@ -672,7 +698,18 @@ private[spark] class Client(
     val remoteFs = FileSystem.get(remoteConfArchivePath.toUri(), hadoopConf)
     cachedResourcesConf.set(CACHED_CONF_ARCHIVE, 
remoteConfArchivePath.toString())
 
-    val localConfArchive = new Path(createConfArchive().toURI())
+    val confsToOverride = Map.empty[String, String]
+    // If propagating the keytab to the AM, override the keytab name with the 
name of the
+    // distributed file.
+    amKeytabFileName.foreach { kt => confsToOverride.put(KEYTAB.key, kt) }
+
+    // If propagating the ivySettings file to the distributed cache, override 
the ivySettings
+    // file name with the name of the distributed file.
+    ivySettingsLocalizedPath.foreach { path =>
+      confsToOverride.put("spark.jars.ivySettings", path)
+    }
+
+    val localConfArchive = new Path(createConfArchive(confsToOverride).toURI())
     copyFileToRemote(destDir, localConfArchive, replication, symlinkCache, 
force = true,
       destName = Some(LOCALIZED_CONF_ARCHIVE))
 
@@ -701,8 +738,10 @@ private[spark] class Client(
    *
    * The archive also contains some Spark configuration. Namely, it saves the 
contents of
    * SparkConf in a file to be loaded by the AM process.
+   *
+   * @param confsToOverride configs that should overriden when creating the 
final spark conf file
    */
-  private def createConfArchive(): File = {
+  private def createConfArchive(confsToOverride: Map[String, String]): File = {
     val hadoopConfFiles = new HashMap[String, File]()
 
     // SPARK_CONF_DIR shows up in the classpath before 
HADOOP_CONF_DIR/YARN_CONF_DIR
@@ -764,7 +803,7 @@ private[spark] class Client(
             if url.getProtocol == "file" } {
         val file = new File(url.getPath())
         confStream.putNextEntry(new ZipEntry(file.getName()))
-        Files.copy(file, confStream)
+        Files.copy(file.toPath, confStream)
         confStream.closeEntry()
       }
 
@@ -775,7 +814,7 @@ private[spark] class Client(
       hadoopConfFiles.foreach { case (name, file) =>
         if (file.canRead()) {
           confStream.putNextEntry(new 
ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name"))
-          Files.copy(file, confStream)
+          Files.copy(file.toPath, confStream)
           confStream.closeEntry()
         }
       }
@@ -788,11 +827,7 @@ private[spark] class Client(
 
       // Save Spark configuration to a file in the archive.
       val props = confToProperties(sparkConf)
-
-      // If propagating the keytab to the AM, override the keytab name with 
the name of the
-      // distributed file.
-      amKeytabFileName.foreach { kt => props.setProperty(KEYTAB.key, kt) }
-
+      confsToOverride.foreach { case (k, v) => props.setProperty(k, v)}
       writePropertiesToArchive(props, SPARK_CONF_FILE, confStream)
 
       // Write the distributed cache config to the archive.
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 9bc934d..26ff3bf 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -29,6 +29,7 @@ import com.google.common.io.{ByteStreams, Files}
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.util.ConverterUtils
 import org.scalatest.concurrent.Eventually._
+import org.scalatest.exceptions.TestFailedException
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
@@ -368,6 +369,64 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
     )
     checkResult(finalState, result, "true")
   }
+
+  def createEmptyIvySettingsFile: File = {
+    val emptyIvySettings = File.createTempFile("ivy", ".xml")
+    Files.write("<ivysettings />", emptyIvySettings, StandardCharsets.UTF_8)
+    emptyIvySettings
+  }
+
+  test("SPARK-34472: ivySettings file with no scheme or file:// scheme should 
be " +
+    "localized on driver in cluster mode") {
+    val emptyIvySettings = createEmptyIvySettingsFile
+    // For file:// URIs or URIs without scheme, make sure that ivySettings 
conf was changed
+    // to the localized file. So the expected ivySettings path on the driver 
will start with
+    // the file name and then some random UUID suffix
+    testIvySettingsDistribution(clientMode = false, 
emptyIvySettings.getAbsolutePath,
+      emptyIvySettings.getName, prefixMatch = true)
+    testIvySettingsDistribution(clientMode = false, 
s"file://${emptyIvySettings.getAbsolutePath}",
+      emptyIvySettings.getName, prefixMatch = true)
+  }
+
+  test("SPARK-34472: ivySettings file with no scheme or file:// scheme should 
retain " +
+    "user provided path in client mode") {
+    val emptyIvySettings = createEmptyIvySettingsFile
+    // In client mode, the file is present locally on the driver and so does 
not need to be
+    // distributed. So the user provided path should be kept as is.
+    testIvySettingsDistribution(clientMode = true, 
emptyIvySettings.getAbsolutePath,
+      emptyIvySettings.getAbsolutePath)
+    testIvySettingsDistribution(clientMode = true, 
s"file://${emptyIvySettings.getAbsolutePath}",
+      s"file://${emptyIvySettings.getAbsolutePath}")
+  }
+
+  test("SPARK-34472: ivySettings file with non-file:// schemes should throw an 
error") {
+    val emptyIvySettings = createEmptyIvySettingsFile
+    val e1 = intercept[TestFailedException] {
+      testIvySettingsDistribution(clientMode = false,
+        s"local://${emptyIvySettings.getAbsolutePath}", "")
+    }
+    assert(e1.getMessage.contains("IllegalArgumentException: " +
+      "Scheme local not supported in spark.jars.ivySettings"))
+    val e2 = intercept[TestFailedException] {
+      testIvySettingsDistribution(clientMode = false,
+        s"hdfs://${emptyIvySettings.getAbsolutePath}", "")
+    }
+    assert(e2.getMessage.contains("IllegalArgumentException: " +
+      "Scheme hdfs not supported in spark.jars.ivySettings"))
+  }
+
+  def testIvySettingsDistribution(clientMode: Boolean, ivySettingsPath: String,
+    expectedIvySettingsPrefixOnDriver: String, prefixMatch: Boolean = false): 
Unit = {
+    val result = File.createTempFile("result", null, tempDir)
+    val outFile = File.createTempFile("out", null, tempDir)
+    val finalState = runSpark(clientMode = clientMode,
+      mainClassName(YarnAddJarTest.getClass),
+      appArgs = Seq(result.getAbsolutePath, expectedIvySettingsPrefixOnDriver,
+        prefixMatch.toString),
+      extraConf = Map("spark.jars.ivySettings" -> ivySettingsPath),
+      outFile = Option(outFile))
+    checkResult(finalState, result, outFile = Option(outFile))
+  }
 }
 
 private[spark] class SaveExecutorInfo extends SparkListener {
@@ -583,6 +642,50 @@ private object YarnClasspathTest extends Logging {
 
 }
 
+private object YarnAddJarTest extends Logging {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 3) {
+      // scalastyle:off println
+      System.err.println(
+        s"""
+           |Invalid command line: ${args.mkString(" ")}
+           |
+           |Usage: YarnAddJarTest [result file] [expected ivy settings path] 
[prefix match]
+        """.stripMargin)
+      // scalastyle:on println
+      System.exit(1)
+    }
+
+    val resultPath = args(0)
+    val expectedIvySettingsPath = args(1)
+    val prefixMatch = args(2).toBoolean
+    val sc = new SparkContext(new SparkConf())
+
+    var result = "failure"
+    try {
+      val settingsFile = sc.getConf.get("spark.jars.ivySettings")
+      if (prefixMatch) {
+        assert(settingsFile !== expectedIvySettingsPath)
+        assert(settingsFile.startsWith(expectedIvySettingsPath))
+      } else {
+        assert(settingsFile === expectedIvySettingsPath)
+      }
+
+      val caught = intercept[RuntimeException] {
+        sc.addJar("ivy://org.fake-project.test:test:1.0.0")
+      }
+      if (caught.getMessage.contains("unresolved dependency: 
org.fake-project.test#test")) {
+        // "unresolved dependency" is expected as the dependency does not exist
+        // but exception like "Ivy settings file <file> does not exist" should 
result in failure
+        result = "success"
+      }
+    } finally {
+      Files.write(result, new File(resultPath), StandardCharsets.UTF_8)
+      sc.stop()
+    }
+  }
+}
+
 private object YarnLauncherTestApp {
 
   def main(args: Array[String]): Unit = {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to