This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6748b48  [SPARK-27835][CORE] Resource Scheduling: change driver config 
from addresses
6748b48 is described below

commit 6748b486a9afe8370786efb64a8c9f3470c62dcf
Author: Thomas Graves <tgra...@nvidia.com>
AuthorDate: Thu May 30 07:51:06 2019 -0500

    [SPARK-27835][CORE] Resource Scheduling: change driver config from addresses
    
    ## What changes were proposed in this pull request?
    
    Change the Driver resource discovery argument for standalone mode to be a 
file rather then separate address configs per resource. This makes it 
consistent with how the Executor is doing it and makes it more flexible in the 
future, and it makes for less configs if you have multiple resources.
    
    ## How was this patch tested?
    
    Unit tests and basic manually testing to make sure files were parsed 
properly.
    
    Closes #24730 from tgravescs/SPARK-27835-driver-resourcesFile.
    
    Authored-by: Thomas Graves <tgra...@nvidia.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../org/apache/spark/ResourceDiscoverer.scala      | 20 ++++++++++++++++--
 .../main/scala/org/apache/spark/SparkContext.scala | 23 ++++++++-------------
 .../executor/CoarseGrainedExecutorBackend.scala    | 24 +++++-----------------
 .../org/apache/spark/internal/config/package.scala | 10 ++++++++-
 .../scala/org/apache/spark/SparkContextSuite.scala | 22 ++++++++++++++++----
 .../CoarseGrainedExecutorBackendSuite.scala        |  4 ++--
 6 files changed, 61 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala 
b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
index d3b3860..e5ae202 100644
--- a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
+++ b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark
 
-import java.io.File
+import java.io.{BufferedInputStream, File, FileInputStream}
 
 import com.fasterxml.jackson.core.JsonParseException
+import com.fasterxml.jackson.databind.exc.MismatchedInputException
 import org.json4s.{DefaultFormats, MappingException}
-import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
@@ -132,4 +132,20 @@ private[spark] object ResourceDiscoverer extends Logging {
       }
     }
   }
+
+  def parseAllocatedFromJsonFile(resourcesFile: String): Map[String, 
ResourceInformation] = {
+    implicit val formats = DefaultFormats
+    // case class to make json4s parsing easy
+    case class JsonResourceInformation(val name: String, val addresses: 
Array[String])
+    val resourceInput = new BufferedInputStream(new 
FileInputStream(resourcesFile))
+    val resources = try {
+      parse(resourceInput).extract[Seq[JsonResourceInformation]]
+    } catch {
+      case e@(_: MappingException | _: MismatchedInputException | _: 
ClassCastException) =>
+        throw new SparkException(s"Exception parsing the resources in 
$resourcesFile", e)
+    } finally {
+      resourceInput.close()
+    }
+    resources.map(r => (r.name, new ResourceInformation(r.name, 
r.addresses))).toMap
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 878010d..6266ce6 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -365,29 +365,24 @@ class SparkContext(config: SparkConf) extends Logging {
 
   /**
    * Checks to see if any resources (GPU/FPGA/etc) are available to the driver 
by looking
-   * at and processing the spark.driver.resource.resourceName.addresses and
+   * at and processing the spark.driver.resourcesFile and
    * spark.driver.resource.resourceName.discoveryScript configs. The configs 
have to be
    * present when the driver starts, setting them after startup does not work.
    *
-   * If any resource addresses configs were specified then assume all 
resources will be specified
-   * in that way. Otherwise use the discovery scripts to find the resources. 
Users should
-   * not really be setting the addresses config directly and should not be 
mixing methods
-   * for different types of resources since the addresses config is meant for 
Standalone mode
+   * If a resources file was specified then assume all resources will be 
specified
+   * in that file. Otherwise use the discovery scripts to find the resources. 
Users should
+   * not be setting the resources file config directly and should not be 
mixing methods
+   * for different types of resources since the resources file config is meant 
for Standalone mode
    * and other cluster managers should use the discovery scripts.
    */
   private def setupDriverResources(): Unit = {
     // Only call getAllWithPrefix once and filter on those since there could 
be a lot of spark
     // configs.
     val allDriverResourceConfs = 
_conf.getAllWithPrefix(SPARK_DRIVER_RESOURCE_PREFIX)
-    val resourcesWithAddrsInConfs =
-      SparkConf.getConfigsWithSuffix(allDriverResourceConfs, 
SPARK_RESOURCE_ADDRESSES_SUFFIX)
-
-    _resources = if (resourcesWithAddrsInConfs.nonEmpty) {
-      resourcesWithAddrsInConfs.map { case (rName, addrString) =>
-        val addrsArray = addrString.split(",").map(_.trim())
-        (rName -> new ResourceInformation(rName, addrsArray))
-      }.toMap
-    } else {
+    val resourcesFile = _conf.get(DRIVER_RESOURCES_FILE)
+    _resources = resourcesFile.map { rFile => {
+      ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
+    }}.getOrElse {
       // we already have the resources confs here so just pass in the unique 
resource names
       // rather then having the resource discoverer reparse all the configs.
       val uniqueResources = SparkConf.getBaseOfConfigs(allDriverResourceConfs)
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index fac4d40..b262c23 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -87,25 +87,11 @@ private[spark] class CoarseGrainedExecutorBackend(
   def parseOrFindResources(resourcesFile: Option[String]): Map[String, 
ResourceInformation] = {
     // only parse the resources if a task requires them
     val resourceInfo = if 
(env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX).nonEmpty) {
-      val actualExecResources = resourcesFile.map { resourceFileStr => {
-        val source = new BufferedInputStream(new 
FileInputStream(resourceFileStr))
-        val resourceMap = try {
-          val parsedJson = parse(source).asInstanceOf[JArray].arr
-          parsedJson.map { json =>
-            val name = (json \ "name").extract[String]
-            val addresses = (json \ "addresses").extract[Array[String]]
-            new ResourceInformation(name, addresses)
-          }.map(x => (x.name -> x)).toMap
-        } catch {
-          case e @ (_: MappingException | _: MismatchedInputException) =>
-            throw new SparkException(
-              s"Exception parsing the resources in $resourceFileStr", e)
-        } finally {
-          source.close()
-        }
-        resourceMap
-      }}.getOrElse(ResourceDiscoverer.discoverResourcesInformation(env.conf,
-        SPARK_EXECUTOR_RESOURCE_PREFIX))
+      val actualExecResources = resourcesFile.map { rFile => {
+        ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
+      }}.getOrElse {
+        ResourceDiscoverer.discoverResourcesInformation(env.conf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
+      }
 
       if (actualExecResources.isEmpty) {
         throw new SparkException("User specified resources per task via: " +
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 469b54c..a5d36b5 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -36,9 +36,17 @@ package object config {
   private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."
 
   private[spark] val SPARK_RESOURCE_COUNT_SUFFIX = ".count"
-  private[spark] val SPARK_RESOURCE_ADDRESSES_SUFFIX = ".addresses"
   private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = 
".discoveryScript"
 
+  private[spark] val DRIVER_RESOURCES_FILE =
+    ConfigBuilder("spark.driver.resourcesFile")
+      .internal()
+      .doc("Path to a file containing the resources allocated to the driver. " 
+
+        "The file should be formatted as a JSON array of ResourceInformation 
objects. " +
+        "Only used internally in standalone mode.")
+      .stringConf
+      .createOptional
+
   private[spark] val DRIVER_CLASS_PATH =
     
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
 
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index abd7d8a..ded914d 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -29,10 +29,13 @@ import scala.concurrent.duration._
 
 import com.google.common.io.Files
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => 
NewTextInputFormat}
+import org.json4s.JsonAST.JArray
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods.{compact, render}
 import org.scalatest.Matchers._
 import org.scalatest.concurrent.Eventually
 
@@ -760,19 +763,30 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     }
   }
 
-  test("test gpu driver resource address and discovery under local-cluster 
mode") {
+  private def writeJsonFile(dir: File, strToWrite: JArray): String = {
+    val f1 = File.createTempFile("test-resource-parser1", "", dir)
+    JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes())
+    f1.getPath()
+  }
+
+  test("test gpu driver resource files and discovery under local-cluster 
mode") {
     withTempDir { dir =>
       val gpuFile = new File(dir, "gpuDiscoverScript")
       val scriptPath = mockDiscoveryScript(gpuFile,
         """'{"name": "gpu","addresses":["5", "6"]}'""")
 
+      val gpusAllocated =
+        ("name" -> "gpu") ~
+        ("addresses" -> Seq("0", "1", "8"))
+      val ja = JArray(List(gpusAllocated))
+      val resourcesFile = writeJsonFile(dir, ja)
+
       val conf = new SparkConf()
         .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
           SPARK_RESOURCE_COUNT_SUFFIX, "1")
         .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
-          SPARK_RESOURCE_ADDRESSES_SUFFIX, "0, 1, 8")
-        .set(SPARK_DRIVER_RESOURCE_PREFIX + "gpu" +
           SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
+        .set(DRIVER_RESOURCES_FILE, resourcesFile)
         .setMaster("local-cluster[1, 1, 1024]")
         .setAppName("test-cluster")
       sc = new SparkContext(conf)
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 43913d1..b3d16d1 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.executor
 
 
-import java.io.{File, PrintWriter}
+import java.io.File
 import java.net.URL
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files => JavaFiles}
@@ -26,7 +26,7 @@ import 
java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, O
 import java.util.EnumSet
 
 import com.google.common.io.Files
-import org.json4s.JsonAST.{JArray, JObject, JString}
+import org.json4s.JsonAST.{JArray, JObject}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods.{compact, render}
 import org.mockito.Mockito.when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to