This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7056e00  [SPARK-27823][CORE] Refactor resource handling code
7056e00 is described below

commit 7056e004ee566fabbb9b22ddee2de55ef03260db
Author: Xiangrui Meng <m...@databricks.com>
AuthorDate: Tue Jun 18 17:18:17 2019 -0700

    [SPARK-27823][CORE] Refactor resource handling code
    
    ## What changes were proposed in this pull request?
    
    Continue the work from https://github.com/apache/spark/pull/24821. Refactor 
resource handling code to make the code more readable. Major changes:
    
    * Moved resource-related classes to `spark.resource` from `spark`.
    * Added ResourceUtils and helper classes so we don't need to directly deal 
with Spark conf.
     * ResourceID: resource identifier and it provides conf keys
     * ResourceRequest/Allocation: abstraction for requested and allocated 
resources
    * Added `TestResourceIDs` to reference commonly used resource IDs in tests 
like `spark.executor.resource.gpu`.
    
    cc: tgravescs jiangxb1987 Ngone51
    
    ## How was this patch tested?
    
    Unit tests for added utils and existing unit tests.
    
    Closes #24856 from mengxr/SPARK-27823.
    
    Lead-authored-by: Xiangrui Meng <m...@databricks.com>
    Co-authored-by: Thomas Graves <tgra...@nvidia.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .../org/apache/spark/BarrierTaskContext.scala      |   1 +
 .../org/apache/spark/ResourceDiscoverer.scala      | 151 ------------
 .../org/apache/spark/ResourceInformation.scala     |  37 ---
 .../main/scala/org/apache/spark/SparkConf.scala    |  45 ----
 .../main/scala/org/apache/spark/SparkContext.scala |  94 +++-----
 .../main/scala/org/apache/spark/TaskContext.scala  |   4 +-
 .../scala/org/apache/spark/TaskContextImpl.scala   |   1 +
 .../main/scala/org/apache/spark/TestUtils.scala    |  30 ++-
 .../executor/CoarseGrainedExecutorBackend.scala    |  52 ++---
 .../org/apache/spark/internal/config/package.scala |  10 +-
 .../spark/resource/ResourceInformation.scala       |  87 +++++++
 .../org/apache/spark/resource/ResourceUtils.scala  | 191 +++++++++++++++
 .../scala/org/apache/spark/scheduler/Task.scala    |   1 +
 .../apache/spark/scheduler/TaskDescription.scala   |   2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  10 +-
 .../apache/spark/scheduler/TaskSetManager.scala    |  11 +-
 .../cluster/CoarseGrainedClusterMessage.scala      |   2 +-
 .../org/apache/spark/ResourceDiscovererSuite.scala | 236 -------------------
 .../scala/org/apache/spark/SparkConfSuite.scala    |  53 +----
 .../scala/org/apache/spark/SparkContextSuite.scala |  93 +++-----
 .../CoarseGrainedExecutorBackendSuite.scala        | 159 +++++++------
 .../org/apache/spark/executor/ExecutorSuite.scala  |   1 +
 .../spark/resource/ResourceInformationSuite.scala  |  64 +++++
 .../apache/spark/resource/ResourceUtilsSuite.scala | 259 +++++++++++++++++++++
 .../TestResourceIDs.scala}                         |  17 +-
 .../CoarseGrainedSchedulerBackendSuite.scala       |   8 +-
 .../scheduler/ExecutorResourceInfoSuite.scala      |   2 +-
 .../spark/scheduler/TaskDescriptionSuite.scala     |   4 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   |  10 +-
 .../spark/scheduler/TaskSetManagerSuite.scala      |   5 +-
 .../apache/spark/deploy/k8s/KubernetesUtils.scala  |  20 +-
 .../k8s/features/BasicDriverFeatureStep.scala      |   2 +-
 .../k8s/features/BasicExecutorFeatureStep.scala    |   2 +-
 .../k8s/features/BasicDriverFeatureStepSuite.scala |  14 +-
 .../features/BasicExecutorFeatureStepSuite.scala   |  37 ++-
 .../k8s/features/KubernetesFeaturesTestUtils.scala |   3 +-
 .../MesosFineGrainedSchedulerBackendSuite.scala    |   3 +-
 .../org/apache/spark/deploy/yarn/Client.scala      |   2 +-
 .../spark/deploy/yarn/ResourceRequestHelper.scala  |   9 +-
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |   2 +-
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala    |   8 +-
 .../YarnCoarseGrainedExecutorBackend.scala         |   3 +-
 .../org/apache/spark/deploy/yarn/ClientSuite.scala |   8 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     |   6 +-
 44 files changed, 908 insertions(+), 851 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala 
b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
index cf957ff..c393df8 100644
--- a/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
@@ -26,6 +26,7 @@ import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.rpc.{RpcEndpointRef, RpcTimeout}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util._
diff --git a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala 
b/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
deleted file mode 100644
index e5ae202..0000000
--- a/core/src/main/scala/org/apache/spark/ResourceDiscoverer.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.{BufferedInputStream, File, FileInputStream}
-
-import com.fasterxml.jackson.core.JsonParseException
-import com.fasterxml.jackson.databind.exc.MismatchedInputException
-import org.json4s.{DefaultFormats, MappingException}
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.util.Utils.executeAndGetOutput
-
-/**
- * Discovers information about resources (GPUs/FPGAs/etc). It currently only 
supports
- * resources that have addresses.
- * This class finds resources by running and parsing the output of the user 
specified script
- * from the config 
spark.{driver/executor}.resource.{resourceName}.discoveryScript.
- * The output of the script it runs is expected to be JSON in the format of the
- * ResourceInformation class.
- *
- * For example:  {"name": "gpu", "addresses": ["0","1"]}
- */
-private[spark] object ResourceDiscoverer extends Logging {
-
-  private implicit val formats = DefaultFormats
-
-  /**
-   * This function will discover information about a set of resources by using 
the
-   * user specified script 
(spark.{driver/executor}.resource.{resourceName}.discoveryScript).
-   * It optionally takes a set of resource names or if that isn't specified
-   * it uses the config prefix passed in to look at the executor or driver 
configs
-   * to get the resource names. Then for each resource it will run the 
discovery script
-   * and get the ResourceInformation about it.
-   *
-   * @param sparkConf SparkConf
-   * @param confPrefix Driver or Executor resource prefix
-   * @param resourceNamesOpt Optionally specify resource names. If not set 
uses the resource
-   *                  configs based on confPrefix passed in to get the 
resource names.
-   * @return Map of resource name to ResourceInformation
-   */
-  def discoverResourcesInformation(
-      sparkConf: SparkConf,
-      confPrefix: String,
-      resourceNamesOpt: Option[Set[String]] = None
-      ): Map[String, ResourceInformation] = {
-    val resourceNames = resourceNamesOpt.getOrElse(
-      // get unique resource names by grabbing first part config with multiple 
periods,
-      // ie resourceName.count, grab resourceName part
-      SparkConf.getBaseOfConfigs(sparkConf.getAllWithPrefix(confPrefix))
-    )
-    resourceNames.map { rName => {
-      val rInfo = getResourceInfo(sparkConf, confPrefix, rName)
-      (rName -> rInfo)
-    }}.toMap
-  }
-
-  private def getResourceInfo(
-      sparkConf: SparkConf,
-      confPrefix: String,
-      resourceName: String): ResourceInformation = {
-    val discoveryConf = confPrefix + resourceName + 
SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX
-    val script = sparkConf.getOption(discoveryConf)
-    val result = if (script.nonEmpty) {
-      val scriptFile = new File(script.get)
-      // check that script exists and try to execute
-      if (scriptFile.exists()) {
-        try {
-          val output = executeAndGetOutput(Seq(script.get), new File("."))
-          val parsedJson = parse(output)
-          val name = (parsedJson \ "name").extract[String]
-          val addresses = (parsedJson \ "addresses").extract[Array[String]]
-          if (name != resourceName) {
-            throw new SparkException(s"Discovery script: ${script.get} 
specified via " +
-              s"$discoveryConf returned a resource name: $name that doesn't 
match the " +
-              s"config name: $resourceName")
-          }
-          new ResourceInformation(name, addresses)
-        } catch {
-          case e @ (_: SparkException | _: MappingException | _: 
JsonParseException) =>
-            throw new SparkException(s"Error running the resource discovery 
script: $scriptFile" +
-              s" for $resourceName", e)
-        }
-      } else {
-        throw new SparkException(s"Resource script: $scriptFile to discover 
$resourceName" +
-          s" doesn't exist!")
-      }
-    } else {
-      throw new SparkException(s"User is expecting to use $resourceName 
resources but " +
-        s"didn't specify a script via conf: $discoveryConf, to find them!")
-    }
-    result
-  }
-
-  /**
-   * Make sure the actual resources we have on startup are at least the number 
the user
-   * requested. Note that there is other code in SparkConf that makes sure we 
have executor configs
-   * for each task resource requirement and that they are large enough. This 
function
-   * is used by both driver and executors.
-   *
-   * @param requiredResources The resources that are required for us to run.
-   * @param actualResources The actual resources discovered.
-   */
-  def checkActualResourcesMeetRequirements(
-      requiredResources: Map[String, String],
-      actualResources: Map[String, ResourceInformation]): Unit = {
-    requiredResources.foreach { case (rName, reqCount) =>
-      val actualRInfo = actualResources.get(rName).getOrElse(
-        throw new SparkException(s"Resource: $rName required but wasn't 
discovered on startup"))
-
-      if (actualRInfo.addresses.size < reqCount.toLong) {
-        throw new SparkException(s"Resource: $rName, with addresses: " +
-          s"${actualRInfo.addresses.mkString(",")} " +
-          s"is less than what the user requested: $reqCount)")
-      }
-    }
-  }
-
-  def parseAllocatedFromJsonFile(resourcesFile: String): Map[String, 
ResourceInformation] = {
-    implicit val formats = DefaultFormats
-    // case class to make json4s parsing easy
-    case class JsonResourceInformation(val name: String, val addresses: 
Array[String])
-    val resourceInput = new BufferedInputStream(new 
FileInputStream(resourcesFile))
-    val resources = try {
-      parse(resourceInput).extract[Seq[JsonResourceInformation]]
-    } catch {
-      case e@(_: MappingException | _: MismatchedInputException | _: 
ClassCastException) =>
-        throw new SparkException(s"Exception parsing the resources in 
$resourcesFile", e)
-    } finally {
-      resourceInput.close()
-    }
-    resources.map(r => (r.name, new ResourceInformation(r.name, 
r.addresses))).toMap
-  }
-}
diff --git a/core/src/main/scala/org/apache/spark/ResourceInformation.scala 
b/core/src/main/scala/org/apache/spark/ResourceInformation.scala
deleted file mode 100644
index 6a5b725..0000000
--- a/core/src/main/scala/org/apache/spark/ResourceInformation.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.apache.spark.annotation.Evolving
-
-/**
- * Class to hold information about a type of Resource. A resource could be a 
GPU, FPGA, etc.
- * The array of addresses are resource specific and its up to the user to 
interpret the address.
- *
- * One example is GPUs, where the addresses would be the indices of the GPUs
- *
- * @param name the name of the resource
- * @param addresses an array of strings describing the addresses of the 
resource
- */
-@Evolving
-class ResourceInformation(
-    val name: String,
-    val addresses: Array[String]) extends Serializable {
-
-  override def toString: String = s"[name: ${name}, addresses: 
${addresses.mkString(",")}]"
-}
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala 
b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e231a40..24be54e 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -416,14 +416,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
   }
 
   /**
-   * Get all parameters that start with `prefix` and end with 'suffix'
-   */
-  def getAllWithPrefixAndSuffix(prefix: String, suffix: String): 
Array[(String, String)] = {
-    getAll.filter { case (k, v) => k.startsWith(prefix) && k.endsWith(suffix) }
-      .map { case (k, v) => (k.substring(prefix.length, (k.length - 
suffix.length)), v) }
-  }
-
-  /**
    * Get a parameter as an integer, falling back to a default if not set
    * @throws NumberFormatException If the value cannot be interpreted as an 
integer
    */
@@ -507,14 +499,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Seria
     }
   }
 
-  /**
-   * Get task resource requirements.
-   */
-  private[spark] def getTaskResourceRequirements(): Map[String, Int] = {
-    getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX)
-      .withFilter { case (k, v) => k.endsWith(SPARK_RESOURCE_AMOUNT_SUFFIX)}
-      .map { case (k, v) => (k.dropRight(SPARK_RESOURCE_AMOUNT_SUFFIX.length), 
v.toInt)}.toMap
-  }
 
   /**
    * Checks for illegal or deprecated config settings. Throws an exception for 
the former. Not
@@ -805,35 +789,6 @@ private[spark] object SparkConf extends Logging {
   }
 
   /**
-   * A function to help parsing configs with multiple parts where the base and
-   * suffix could be one of many options. For instance configs like:
-   * spark.executor.resource.{resourceName}.{count/addresses}
-   * This function takes an Array of configs you got from the
-   * getAllWithPrefix function, selects only those that end with the suffix
-   * passed in and returns just the base part of the config before the first
-   * '.' and its value.
-   */
-  def getConfigsWithSuffix(
-      configs: Array[(String, String)],
-      suffix: String
-      ): Array[(String, String)] = {
-    configs.filter { case (rConf, _) => rConf.endsWith(suffix)}.
-      map { case (k, v) => (k.split('.').head, v) }
-  }
-
-  /**
-   * A function to help parsing configs with multiple parts where the base and
-   * suffix could be one of many options. For instance configs like:
-   * spark.executor.resource.{resourceName}.{count/addresses}
-   * This function takes an Array of configs you got from the
-   * getAllWithPrefix function and returns the base part of the config
-   * before the first '.'.
-   */
-  def getBaseOfConfigs(configs: Array[(String, String)]): Set[String] = {
-    configs.map { case (k, _) => k.split('.').head }.toSet
-  }
-
-  /**
    * Holds information about keys that have been deprecated and do not have a 
replacement.
    *
    * @param key The deprecated key.
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 4fc445b..a0d7aa7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -51,6 +51,8 @@ import org.apache.spark.io.CompressionCodec
 import org.apache.spark.metrics.source.JVMCPUSource
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
+import org.apache.spark.resource.{ResourceID, ResourceInformation}
+import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
@@ -363,43 +365,6 @@ class SparkContext(config: SparkConf) extends Logging {
     Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased))
   }
 
-  /**
-   * Checks to see if any resources (GPU/FPGA/etc) are available to the driver 
by looking
-   * at and processing the spark.driver.resourcesFile and
-   * spark.driver.resource.resourceName.discoveryScript configs. The configs 
have to be
-   * present when the driver starts, setting them after startup does not work.
-   *
-   * If a resources file was specified then assume all resources will be 
specified
-   * in that file. Otherwise use the discovery scripts to find the resources. 
Users should
-   * not be setting the resources file config directly and should not be 
mixing methods
-   * for different types of resources since the resources file config is meant 
for Standalone mode
-   * and other cluster managers should use the discovery scripts.
-   */
-  private def setupDriverResources(): Unit = {
-    // Only call getAllWithPrefix once and filter on those since there could 
be a lot of spark
-    // configs.
-    val allDriverResourceConfs = 
_conf.getAllWithPrefix(SPARK_DRIVER_RESOURCE_PREFIX)
-    val resourcesFile = _conf.get(DRIVER_RESOURCES_FILE)
-    _resources = resourcesFile.map { rFile => {
-      ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
-    }}.getOrElse {
-      // we already have the resources confs here so just pass in the unique 
resource names
-      // rather then having the resource discoverer reparse all the configs.
-      val uniqueResources = SparkConf.getBaseOfConfigs(allDriverResourceConfs)
-      ResourceDiscoverer.discoverResourcesInformation(_conf, 
SPARK_DRIVER_RESOURCE_PREFIX,
-        Some(uniqueResources))
-    }
-    // verify the resources we discovered are what the user requested
-    val driverReqResourcesAndCounts =
-      SparkConf.getConfigsWithSuffix(allDriverResourceConfs, 
SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
-    
ResourceDiscoverer.checkActualResourcesMeetRequirements(driverReqResourcesAndCounts,
 _resources)
-
-    
logInfo("===============================================================================")
-    logInfo(s"Driver Resources:")
-    _resources.foreach { case (k, v) => logInfo(s"$k -> $v") }
-    
logInfo("===============================================================================")
-  }
-
   try {
     _conf = config.clone()
     _conf.validateSettings()
@@ -413,7 +378,8 @@ class SparkContext(config: SparkConf) extends Logging {
 
     _driverLogger = DriverLogger(_conf)
 
-    setupDriverResources()
+    val resourcesFileOpt = conf.get(DRIVER_RESOURCES_FILE)
+    _resources = getOrDiscoverAllResources(_conf, SPARK_DRIVER_PREFIX, 
resourcesFileOpt)
 
     // log out spark.app.name in the Spark driver logs
     logInfo(s"Submitted application: $appName")
@@ -2732,44 +2698,46 @@ object SparkContext extends Logging {
 
       // Calculate the max slots each executor can provide based on resources 
available on each
       // executor and resources required by each task.
-      val taskResourcesAndCount = sc.conf.getTaskResourceRequirements()
-      val executorResourcesAndCounts = sc.conf.getAllWithPrefixAndSuffix(
-        SPARK_EXECUTOR_RESOURCE_PREFIX, SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
+      val taskResourceRequirements = parseTaskResourceRequirements(sc.conf)
+      val executorResourcesAndAmounts =
+        parseAllResourceRequests(sc.conf, SPARK_EXECUTOR_PREFIX)
+          .map(request => (request.id.resourceName, request.amount)).toMap
       var numSlots = execCores / taskCores
       var limitingResourceName = "CPU"
-      taskResourcesAndCount.foreach { case (rName, taskCount) =>
+
+      taskResourceRequirements.foreach { taskReq =>
         // Make sure the executor resources were specified through config.
-        val execCount = executorResourcesAndCounts.getOrElse(rName,
-          throw new SparkException(
-            s"The executor resource config: " +
-              s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-              "needs to be specified since a task requirement config: " +
-              s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} was specified")
+        val execAmount = 
executorResourcesAndAmounts.getOrElse(taskReq.resourceName,
+          throw new SparkException("The executor resource config: " +
+            ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+            " needs to be specified since a task requirement config: " +
+            ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+            " was specified")
         )
         // Make sure the executor resources are large enough to launch at 
least one task.
-        if (execCount.toLong < taskCount.toLong) {
-          throw new SparkException(
-            s"The executor resource config: " +
-              s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} " +
-              s"= $execCount has to be >= the task config: " +
-              s"${SPARK_TASK_RESOURCE_PREFIX + rName + 
SPARK_RESOURCE_AMOUNT_SUFFIX} = $taskCount")
+        if (execAmount < taskReq.amount) {
+          throw new SparkException("The executor resource config: " +
+            ResourceID(SPARK_EXECUTOR_PREFIX, taskReq.resourceName).amountConf 
+
+            s" = $execAmount has to be >= the requested amount in task 
resource config: " +
+            ResourceID(SPARK_TASK_PREFIX, taskReq.resourceName).amountConf +
+            s" = ${taskReq.amount}")
         }
         // Compare and update the max slots each executor can provide.
-        val resourceNumSlots = execCount.toInt / taskCount
+        val resourceNumSlots = execAmount / taskReq.amount
         if (resourceNumSlots < numSlots) {
           numSlots = resourceNumSlots
-          limitingResourceName = rName
+          limitingResourceName = taskReq.resourceName
         }
       }
       // There have been checks above to make sure the executor resources were 
specified and are
       // large enough if any task resources were specified.
-      taskResourcesAndCount.foreach { case (rName, taskCount) =>
-        val execCount = executorResourcesAndCounts(rName)
-        if (taskCount.toInt * numSlots < execCount.toInt) {
-          val message = s"The configuration of resource: $rName (exec = 
${execCount.toInt}, " +
-            s"task = ${taskCount}) will result in wasted resources due to 
resource " +
-            s"${limitingResourceName} limiting the number of runnable tasks 
per executor to: " +
-            s"${numSlots}. Please adjust your configuration."
+      taskResourceRequirements.foreach { taskReq =>
+        val execAmount = executorResourcesAndAmounts(taskReq.resourceName)
+        if (taskReq.amount * numSlots < execAmount) {
+          val message = s"The configuration of resource: 
${taskReq.resourceName} " +
+            s"(exec = ${execAmount}, task = ${taskReq.amount}) will result in 
wasted " +
+            s"resources due to resource ${limitingResourceName} limiting the 
number of " +
+            s"runnable tasks per executor to: ${numSlots}. Please adjust your 
configuration."
           if (Utils.isTesting) {
             throw new SparkException(message)
           } else {
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 803167e..2299c54 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -24,6 +24,7 @@ import org.apache.spark.annotation.{DeveloperApi, Evolving}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.{AccumulatorV2, TaskCompletionListener, 
TaskFailureListener}
 
@@ -178,7 +179,8 @@ abstract class TaskContext extends Serializable {
 
   /**
    * Resources allocated to the task. The key is the resource name and the 
value is information
-   * about the resource. Please refer to [[ResourceInformation]] for specifics.
+   * about the resource. Please refer to 
[[org.apache.spark.resource.ResourceInformation]] for
+   * specifics.
    */
   @Evolving
   def resources(): Map[String, ResourceInformation]
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala 
b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
index 8e40b7f..516fb95 100644
--- a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
@@ -27,6 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.metrics.source.Source
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util._
 
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala 
b/core/src/main/scala/org/apache/spark/TestUtils.scala
index 5d88612..41ae3ae 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -20,9 +20,11 @@ package org.apache.spark
 import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
 import java.net.{HttpURLConnection, URI, URL}
 import java.nio.charset.StandardCharsets
+import java.nio.file.{Files => JavaFiles}
+import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, 
OWNER_WRITE}
 import java.security.SecureRandom
 import java.security.cert.X509Certificate
-import java.util.{Arrays, Properties}
+import java.util.{Arrays, EnumSet, Properties}
 import java.util.concurrent.{TimeoutException, TimeUnit}
 import java.util.jar.{JarEntry, JarOutputStream}
 import javax.net.ssl._
@@ -36,6 +38,8 @@ import scala.util.Try
 
 import com.google.common.io.{ByteStreams, Files}
 import org.apache.log4j.PropertyConfigurator
+import org.json4s.JsonAST.JValue
+import org.json4s.jackson.JsonMethods.{compact, render}
 
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config._
@@ -312,15 +316,21 @@ private[spark] object TestUtils {
     current ++ current.filter(_.isDirectory).flatMap(recursiveList)
   }
 
-  /**
-   * Set task resource requirement.
-   */
-  def setTaskResourceRequirement(
-      conf: SparkConf,
-      resourceName: String,
-      resourceCount: Int): SparkConf = {
-    val key = 
s"${SPARK_TASK_RESOURCE_PREFIX}${resourceName}${SPARK_RESOURCE_AMOUNT_SUFFIX}"
-    conf.set(key, resourceCount.toString)
+  /** Creates a temp JSON file that contains the input JSON record. */
+  def createTempJsonFile(dir: File, prefix: String, jsonValue: JValue): String 
= {
+    val file = File.createTempFile(prefix, ".json", dir)
+    JavaFiles.write(file.toPath, compact(render(jsonValue)).getBytes())
+    file.getPath
+  }
+
+  /** Creates a temp bash script that prints the given output. */
+  def createTempScriptWithExpectedOutput(dir: File, prefix: String, output: 
String): String = {
+    val file = File.createTempFile(prefix, ".sh", dir)
+    val script = s"cat <<EOF\n$output\nEOF\n"
+    Files.write(script, file, StandardCharsets.UTF_8)
+    JavaFiles.setPosixFilePermissions(file.toPath,
+      EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
+    file.getPath
   }
 }
 
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 9b5f9f5..2f4fc0e 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -34,6 +34,8 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.worker.WorkerWatcher
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.rpc._
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
@@ -48,7 +50,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     cores: Int,
     userClassPath: Seq[URL],
     env: SparkEnv,
-    resourcesFile: Option[String])
+    resourcesFileOpt: Option[String])
   extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
 
   private implicit val formats = DefaultFormats
@@ -70,7 +72,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   override def onStart() {
     logInfo("Connecting to driver: " + driverUrl)
-    val resources = parseOrFindResources(resourcesFile)
+    val resources = parseOrFindResources(resourcesFileOpt)
     rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
       // This is a very fast action so we can use "ThreadUtils.sameThread"
       driver = Some(ref)
@@ -86,36 +88,19 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
   // visible for testing
-  def parseOrFindResources(resourcesFile: Option[String]): Map[String, 
ResourceInformation] = {
+  def parseOrFindResources(resourcesFileOpt: Option[String]): Map[String, 
ResourceInformation] = {
     // only parse the resources if a task requires them
-    val resourceInfo = if 
(env.conf.getAllWithPrefix(SPARK_TASK_RESOURCE_PREFIX).nonEmpty) {
-      val actualExecResources = resourcesFile.map { rFile => {
-        ResourceDiscoverer.parseAllocatedFromJsonFile(rFile)
-      }}.getOrElse {
-        ResourceDiscoverer.discoverResourcesInformation(env.conf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-      }
-
-      if (actualExecResources.isEmpty) {
+    val resourceInfo = if (parseTaskResourceRequirements(env.conf).nonEmpty) {
+      val resources = getOrDiscoverAllResources(env.conf, 
SPARK_EXECUTOR_PREFIX, resourcesFileOpt)
+      if (resources.isEmpty) {
         throw new SparkException("User specified resources per task via: " +
-          s"$SPARK_TASK_RESOURCE_PREFIX, but can't find any resources 
available on the executor.")
+          s"$SPARK_TASK_PREFIX, but can't find any resources available on the 
executor.")
       }
-      val execReqResourcesAndCounts =
-        env.conf.getAllWithPrefixAndSuffix(SPARK_EXECUTOR_RESOURCE_PREFIX,
-          SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
-
-      
ResourceDiscoverer.checkActualResourcesMeetRequirements(execReqResourcesAndCounts,
-        actualExecResources)
-
-      
logInfo("===============================================================================")
-      logInfo(s"Executor $executorId Resources:")
-      actualExecResources.foreach { case (k, v) => logInfo(s"$k -> $v") }
-      
logInfo("===============================================================================")
-
-      actualExecResources
+      resources
     } else {
-      if (resourcesFile.nonEmpty) {
-        logWarning(s"A resources file was specified but the application is not 
configured " +
-          s"to use any resources, see the configs with prefix: 
${SPARK_TASK_RESOURCE_PREFIX}")
+      if (resourcesFileOpt.nonEmpty) {
+        logWarning("A resources file was specified but the application is not 
configured " +
+          s"to use any resources, see the configs with prefix: 
${SPARK_TASK_PREFIX}")
       }
       Map.empty[String, ResourceInformation]
     }
@@ -245,13 +230,14 @@ private[spark] object CoarseGrainedExecutorBackend 
extends Logging {
       appId: String,
       workerUrl: Option[String],
       userClassPath: mutable.ListBuffer[URL],
-      resourcesFile: Option[String])
+      resourcesFileOpt: Option[String])
 
   def main(args: Array[String]): Unit = {
     val createFn: (RpcEnv, Arguments, SparkEnv) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
       new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, 
arguments.executorId,
-        arguments.hostname, arguments.cores, arguments.userClassPath, env, 
arguments.resourcesFile)
+        arguments.hostname, arguments.cores, arguments.userClassPath, env,
+        arguments.resourcesFileOpt)
     }
     run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), 
createFn)
     System.exit(0)
@@ -313,7 +299,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
     var executorId: String = null
     var hostname: String = null
     var cores: Int = 0
-    var resourcesFile: Option[String] = None
+    var resourcesFileOpt: Option[String] = None
     var appId: String = null
     var workerUrl: Option[String] = None
     val userClassPath = new mutable.ListBuffer[URL]()
@@ -334,7 +320,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
           cores = value.toInt
           argv = tail
         case ("--resourcesFile") :: value :: tail =>
-          resourcesFile = Some(value)
+          resourcesFileOpt = Some(value)
           argv = tail
         case ("--app-id") :: value :: tail =>
           appId = value
@@ -361,7 +347,7 @@ private[spark] object CoarseGrainedExecutorBackend extends 
Logging {
     }
 
     Arguments(driverUrl, executorId, hostname, cores, appId, workerUrl,
-      userClassPath, resourcesFile)
+      userClassPath, resourcesFileOpt)
   }
 
   private def printUsageAndExit(classNameForEntry: String): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 4b0c607..c2dab68 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -31,13 +31,9 @@ import 
org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.MAX_
 
 package object config {
 
-  private[spark] val SPARK_DRIVER_RESOURCE_PREFIX = "spark.driver.resource."
-  private[spark] val SPARK_EXECUTOR_RESOURCE_PREFIX = 
"spark.executor.resource."
-  private[spark] val SPARK_TASK_RESOURCE_PREFIX = "spark.task.resource."
-
-  private[spark] val SPARK_RESOURCE_AMOUNT_SUFFIX = ".amount"
-  private[spark] val SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX = 
".discoveryScript"
-  private[spark] val SPARK_RESOURCE_VENDOR_SUFFIX = ".vendor"
+  private[spark] val SPARK_DRIVER_PREFIX = "spark.driver"
+  private[spark] val SPARK_EXECUTOR_PREFIX = "spark.executor"
+  private[spark] val SPARK_TASK_PREFIX = "spark.task"
 
   private[spark] val DRIVER_RESOURCES_FILE =
     ConfigBuilder("spark.driver.resourcesFile")
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
new file mode 100644
index 0000000..96aef74
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import scala.util.control.NonFatal
+
+import org.json4s.{DefaultFormats, Extraction, JValue}
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.Evolving
+
+/**
+ * Class to hold information about a type of Resource. A resource could be a 
GPU, FPGA, etc.
+ * The array of addresses are resource specific and its up to the user to 
interpret the address.
+ *
+ * One example is GPUs, where the addresses would be the indices of the GPUs
+ *
+ * @param name the name of the resource
+ * @param addresses an array of strings describing the addresses of the 
resource
+ */
+@Evolving
+class ResourceInformation(
+    val name: String,
+    val addresses: Array[String]) extends Serializable {
+
+  override def toString: String = s"[name: ${name}, addresses: 
${addresses.mkString(",")}]"
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: ResourceInformation =>
+        that.getClass == this.getClass &&
+        that.name == name && that.addresses.toSeq == addresses.toSeq
+      case _ =>
+        false
+    }
+  }
+
+  override def hashCode(): Int = Seq(name, addresses.toSeq).hashCode()
+}
+
+private[spark] object ResourceInformation {
+
+  private lazy val exampleJson: String = compact(render(
+    ResourceInformationJson("gpu", Seq("0", "1")).toJValue))
+
+  /**
+   * Parses a JSON string into a [[ResourceInformation]] instance.
+   */
+  def parseJson(json: String): ResourceInformation = {
+    implicit val formats = DefaultFormats
+    try {
+      parse(json).extract[ResourceInformationJson].toResourceInformation
+    } catch {
+      case NonFatal(e) =>
+        throw new SparkException(s"Error parsing JSON into 
ResourceInformation:\n$json\n" +
+          s"Here is a correct example: $exampleJson.", e)
+    }
+  }
+}
+
+/** A case class to simplify JSON serialization of [[ResourceInformation]]. */
+private case class ResourceInformationJson(name: String, addresses: 
Seq[String]) {
+
+  def toJValue: JValue = {
+    Extraction.decompose(this)(DefaultFormats)
+  }
+
+  def toResourceInformation: ResourceInformation = {
+    new ResourceInformation(name, addresses.toArray)
+  }
+}
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
new file mode 100644
index 0000000..6926586
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.io.File
+import java.nio.file.{Files, Paths}
+
+import scala.util.control.NonFatal
+
+import org.json4s.DefaultFormats
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils.executeAndGetOutput
+
+/**
+ * Resource identifier.
+ * @param componentName spark.driver / spark.executor / spark.task
+ * @param resourceName  gpu, fpga, etc
+ */
+private[spark] case class ResourceID(componentName: String, resourceName: 
String) {
+  def confPrefix: String = s"$componentName.resource.$resourceName." // with 
ending dot
+  def amountConf: String = s"$confPrefix${ResourceUtils.AMOUNT}"
+  def discoveryScriptConf: String = 
s"$confPrefix${ResourceUtils.DISCOVERY_SCRIPT}"
+  def vendorConf: String = s"$confPrefix${ResourceUtils.VENDOR}"
+}
+
+private[spark] case class ResourceRequest(
+    id: ResourceID,
+    amount: Int,
+    discoveryScript: Option[String],
+    vendor: Option[String])
+
+private[spark] case class TaskResourceRequirement(resourceName: String, 
amount: Int)
+
+/**
+ * Case class representing allocated resource addresses for a specific 
resource.
+ * Cluster manager uses the JSON serialization of this case class to pass 
allocated resource info to
+ * driver and executors. See the ``--resourcesFile`` option there.
+ */
+private[spark] case class ResourceAllocation(id: ResourceID, addresses: 
Seq[String]) {
+  def toResourceInformation: ResourceInformation = {
+    new ResourceInformation(id.resourceName, addresses.toArray)
+  }
+}
+
+private[spark] object ResourceUtils extends Logging {
+
+  // config suffixes
+  val DISCOVERY_SCRIPT = "discoveryScript"
+  val VENDOR = "vendor"
+  // user facing configs use .amount to allow to extend in the future,
+  // internally we currently only support addresses, so its just an integer 
count
+  val AMOUNT = "amount"
+
+  def parseResourceRequest(sparkConf: SparkConf, resourceId: ResourceID): 
ResourceRequest = {
+    val settings = sparkConf.getAllWithPrefix(resourceId.confPrefix).toMap
+    val amount = settings.getOrElse(AMOUNT,
+      throw new SparkException(s"You must specify an amount for 
${resourceId.resourceName}")
+    ).toInt
+    val discoveryScript = settings.get(DISCOVERY_SCRIPT)
+    val vendor = settings.get(VENDOR)
+    ResourceRequest(resourceId, amount, discoveryScript, vendor)
+  }
+
+  def listResourceIds(sparkConf: SparkConf, componentName: String): 
Seq[ResourceID] = {
+    sparkConf.getAllWithPrefix(s"$componentName.resource.").map { case (key, 
_) =>
+      key.substring(0, key.indexOf('.'))
+    }.toSet.toSeq.map(name => ResourceID(componentName, name))
+  }
+
+  def parseAllResourceRequests(
+      sparkConf: SparkConf,
+      componentName: String): Seq[ResourceRequest] = {
+    listResourceIds(sparkConf, componentName).map { id =>
+      parseResourceRequest(sparkConf, id)
+    }
+  }
+
+  def parseTaskResourceRequirements(sparkConf: SparkConf): 
Seq[TaskResourceRequirement] = {
+    parseAllResourceRequests(sparkConf, SPARK_TASK_PREFIX).map { request =>
+      TaskResourceRequirement(request.id.resourceName, request.amount)
+    }
+  }
+
+  private def parseAllocatedFromJsonFile(resourcesFile: String): 
Seq[ResourceAllocation] = {
+    implicit val formats = DefaultFormats
+    val json = new String(Files.readAllBytes(Paths.get(resourcesFile)))
+    try {
+      parse(json).extract[Seq[ResourceAllocation]]
+    } catch {
+      case NonFatal(e) =>
+        throw new SparkException(s"Error parsing resources file 
$resourcesFile", e)
+    }
+  }
+
+  private def parseAllocatedOrDiscoverResources(
+      sparkConf: SparkConf,
+      componentName: String,
+      resourcesFileOpt: Option[String]): Seq[ResourceAllocation] = {
+    val allocated = resourcesFileOpt.toSeq.flatMap(parseAllocatedFromJsonFile)
+      .filter(_.id.componentName == componentName)
+    val otherResourceIds = listResourceIds(sparkConf, 
componentName).diff(allocated.map(_.id))
+    allocated ++ otherResourceIds.map { id =>
+      val request = parseResourceRequest(sparkConf, id)
+      ResourceAllocation(id, discoverResource(request).addresses)
+    }
+  }
+
+  private def assertResourceAllocationMeetsRequest(
+      allocation: ResourceAllocation,
+      request: ResourceRequest): Unit = {
+    require(allocation.id == request.id && allocation.addresses.size >= 
request.amount,
+      s"Resource: ${allocation.id.resourceName}, with addresses: " +
+      s"${allocation.addresses.mkString(",")} " +
+      s"is less than what the user requested: ${request.amount})")
+  }
+
+  private def assertAllResourceAllocationsMeetRequests(
+      allocations: Seq[ResourceAllocation],
+      requests: Seq[ResourceRequest]): Unit = {
+    val allocated = allocations.map(x => x.id -> x).toMap
+    requests.foreach(r => 
assertResourceAllocationMeetsRequest(allocated(r.id), r))
+  }
+
+  /**
+   * Gets all allocated resource information for the input component from 
input resources file and
+   * discover the remaining via discovery scripts.
+   * It also verifies the resource allocation meets required amount for each 
resource.
+   * @return a map from resource name to resource info
+   */
+  def getOrDiscoverAllResources(
+      sparkConf: SparkConf,
+      componentName: String,
+      resourcesFileOpt: Option[String]): Map[String, ResourceInformation] = {
+    val requests = parseAllResourceRequests(sparkConf, componentName)
+    val allocations = parseAllocatedOrDiscoverResources(sparkConf, 
componentName, resourcesFileOpt)
+    assertAllResourceAllocationsMeetRequests(allocations, requests)
+    val resourceInfoMap = allocations.map(a => (a.id.resourceName, 
a.toResourceInformation)).toMap
+    logInfo("==============================================================")
+    logInfo(s"Resources for 
$componentName:\n${resourceInfoMap.mkString("\n")}")
+    logInfo("==============================================================")
+    resourceInfoMap
+  }
+
+  // visible for test
+  private[spark] def discoverResource(resourceRequest: ResourceRequest): 
ResourceInformation = {
+    val resourceName = resourceRequest.id.resourceName
+    val script = resourceRequest.discoveryScript
+    val result = if (script.nonEmpty) {
+      val scriptFile = new File(script.get)
+      // check that script exists and try to execute
+      if (scriptFile.exists()) {
+        val output = executeAndGetOutput(Seq(script.get), new File("."))
+        ResourceInformation.parseJson(output)
+      } else {
+        throw new SparkException(s"Resource script: $scriptFile to discover 
$resourceName " +
+          "doesn't exist!")
+      }
+    } else {
+      throw new SparkException(s"User is expecting to use resource: 
$resourceName but " +
+        "didn't specify a discovery script!")
+    }
+    if (!result.name.equals(resourceName)) {
+      throw new SparkException(s"Error running the resource discovery script 
${script.get}: " +
+        s"script returned resource name ${result.name} and we were expecting 
$resourceName.")
+    }
+    result
+  }
+
+  // known types of resources
+  final val GPU: String = "gpu"
+  final val FPGA: String = "fpga"
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index bb44e9a..9dfbf86 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -26,6 +26,7 @@ import org.apache.spark.internal.config.APP_CALLER_CONTEXT
 import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rdd.InputFileBlockHolder
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.util._
 
 /**
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index c29ee06..247cfe7 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -26,7 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.immutable
 import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
 
-import org.apache.spark.ResourceInformation
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 /**
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index cf07847..242486c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -28,9 +28,9 @@ import scala.util.Random
 import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.ExecutorMetrics
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config
+import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils
 import org.apache.spark.rpc.RpcEndpoint
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
@@ -93,7 +93,7 @@ private[spark] class TaskSchedulerImpl(
   val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
 
   // Resources to request per task
-  val resourcesPerTask = conf.getTaskResourceRequirements()
+  val resourcesReqsPerTask = 
ResourceUtils.parseTaskResourceRequirements(sc.conf)
 
   // TaskSetManagers are not thread safe, so any access to one should be 
synchronized
   // on this class.  Protected by `this`
@@ -382,8 +382,8 @@ private[spark] class TaskSchedulerImpl(
    * Check whether the resources from the WorkerOffer are enough to run at 
least one task.
    */
   private def resourcesMeetTaskRequirements(resources: Map[String, 
Buffer[String]]): Boolean = {
-    resourcesPerTask.forall { case (rName, rNum) =>
-      resources.contains(rName) && resources(rName).size >= rNum
+    resourcesReqsPerTask.forall { req =>
+      resources.contains(req.resourceName) && resources(req.resourceName).size 
>= req.amount
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 6f2b982..e7645fc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -30,6 +30,7 @@ import org.apache.spark._
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.scheduler.SchedulingMode._
 import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, 
SystemClock, Utils}
 import org.apache.spark.util.collection.MedianHeap
@@ -534,14 +535,16 @@ private[spark] class TaskSetManager(
         logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
           s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit()} bytes)")
 
-        val extraResources = sched.resourcesPerTask.map { case (rName, rNum) =>
+        val extraResources = sched.resourcesReqsPerTask.map { taskReq =>
+          val rName = taskReq.resourceName
+          val count = taskReq.amount
           val rAddresses = availableResources.getOrElse(rName, Seq.empty)
-          assert(rAddresses.size >= rNum, s"Required $rNum $rName addresses, 
but only " +
+          assert(rAddresses.size >= count, s"Required $count $rName addresses, 
but only " +
             s"${rAddresses.size} available.")
           // We'll drop the allocated addresses later inside TaskSchedulerImpl.
-          val allocatedAddresses = rAddresses.take(rNum)
+          val allocatedAddresses = rAddresses.take(count)
           (rName, new ResourceInformation(rName, allocatedAddresses.toArray))
-        }
+        }.toMap
 
         sched.dagScheduler.taskStarted(task, info)
         new TaskDescription(
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 82d51f8..a90fff0 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -19,8 +19,8 @@ package org.apache.spark.scheduler.cluster
 
 import java.nio.ByteBuffer
 
-import org.apache.spark.ResourceInformation
 import org.apache.spark.TaskState.TaskState
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.ExecutorLossReason
 import org.apache.spark.util.SerializableBuffer
diff --git a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala 
b/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
deleted file mode 100644
index 2272573..0000000
--- a/core/src/test/scala/org/apache/spark/ResourceDiscovererSuite.scala
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.File
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files => JavaFiles}
-import java.nio.file.attribute.PosixFilePermission._
-import java.util.EnumSet
-
-import com.google.common.io.Files
-
-import org.apache.spark.ResourceName._
-import org.apache.spark.internal.config._
-import org.apache.spark.util.Utils
-
-class ResourceDiscovererSuite extends SparkFunSuite
-    with LocalSparkContext {
-
-  def mockDiscoveryScript(file: File, result: String): String = {
-    Files.write(s"echo $result", file, StandardCharsets.UTF_8)
-    JavaFiles.setPosixFilePermissions(file.toPath(),
-      EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
-    file.getPath()
-  }
-
-  test("Resource discoverer no resources") {
-    val sparkconf = new SparkConf
-    val resources =
-      ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-    assert(resources.size === 0)
-    assert(resources.get(GPU).isEmpty,
-      "Should have a gpus entry that is empty")
-  }
-
-  test("Resource discoverer multiple gpus") {
-    val sparkconf = new SparkConf
-    assume(!(Utils.isWindows))
-    withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val scriptPath = mockDiscoveryScript(gpuFile,
-        """'{"name": "gpu","addresses":["0", "1"]}'""")
-      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
-      val resources =
-        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-      val gpuValue = resources.get(GPU)
-      assert(gpuValue.nonEmpty, "Should have a gpu entry")
-      assert(gpuValue.get.name == "gpu", "name should be gpu")
-      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
-      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
-    }
-  }
-
-  test("Resource discoverer passed in resources") {
-    val sparkconf = new SparkConf
-    assume(!(Utils.isWindows))
-    withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val gpuScript = mockDiscoveryScript(gpuFile,
-        """'{"name": "gpu","addresses":["0", "1"]}'""")
-      val fpgaFile = new File(dir, "fpgaDiscoverScript")
-      val fpgaScript = mockDiscoveryScript(fpgaFile,
-        """'{"name": "fpga","addresses":["f0", "f1"]}'""")
-      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuScript)
-      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaScript)
-      // it should only look at the resources passed in and ignore fpga conf
-      val resources =
-        ResourceDiscoverer.discoverResourcesInformation(sparkconf,
-          SPARK_EXECUTOR_RESOURCE_PREFIX, Some(Set(GPU)))
-      assert(resources.size === 1, "should only have the gpu resource")
-      val gpuValue = resources.get(GPU)
-      assert(gpuValue.nonEmpty, "Should have a gpu entry")
-      assert(gpuValue.get.name == "gpu", "name should be gpu")
-      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
-      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
-    }
-  }
-
-  test("Resource discoverer no addresses errors") {
-    val sparkconf = new SparkConf
-    assume(!(Utils.isWindows))
-    withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val scriptPath = mockDiscoveryScript(gpuFile,
-        """'{"name": "gpu"}'""")
-      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
-      val resources =
-        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-      val gpuValue = resources.get(GPU)
-      assert(gpuValue.nonEmpty, "Should have a gpu entry")
-      assert(gpuValue.get.name == "gpu", "name should be gpu")
-      assert(gpuValue.get.addresses.size == 0, "Should have 0 indexes")
-    }
-  }
-
-  test("Resource discoverer multiple resource types") {
-    val sparkconf = new SparkConf
-    assume(!(Utils.isWindows))
-    withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val gpuDiscovery = mockDiscoveryScript(gpuFile,
-        """'{"name": "gpu", "addresses": ["0", "1"]}'""")
-      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
-
-      val fpgaFile = new File(dir, "fpgaDiscoverScript")
-      val fpgaDiscovery = mockDiscoveryScript(fpgaFile,
-        """'{"name": "fpga", "addresses": ["f1", "f2", "f3"]}'""")
-      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery)
-
-      val resources =
-        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-      assert(resources.size === 2)
-      val gpuValue = resources.get(GPU)
-      assert(gpuValue.nonEmpty, "Should have a gpu entry")
-      assert(gpuValue.get.name == "gpu", "name should be gpu")
-      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
-      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
-
-      val fpgaValue = resources.get(FPGA)
-      assert(fpgaValue.nonEmpty, "Should have a gpu entry")
-      assert(fpgaValue.get.name == "fpga", "name should be fpga")
-      assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes")
-      assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep,
-        "should have f1,f2,f3 entries")
-    }
-  }
-
-  test("Resource discoverer multiple gpus on driver") {
-    val sparkconf = new SparkConf
-    assume(!(Utils.isWindows))
-    withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val gpuDiscovery = mockDiscoveryScript(gpuFile,
-        """'{"name": "gpu", "addresses": ["0", "1"]}'""")
-      sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
-      sparkconf set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, "boguspath")
-      // make sure it reads from correct config, here it should use driver
-      val resources =
-        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_DRIVER_RESOURCE_PREFIX)
-      val gpuValue = resources.get(GPU)
-      assert(gpuValue.nonEmpty, "Should have a gpu entry")
-      assert(gpuValue.get.name == "gpu", "name should be gpu")
-      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
-      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
-    }
-  }
-
-  test("Resource discoverer script returns mismatched name") {
-    val sparkconf = new SparkConf
-    assume(!(Utils.isWindows))
-    withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val gpuDiscovery = mockDiscoveryScript(gpuFile,
-        """'{"name": "fpga", "addresses": ["0", "1"]}'""")
-      sparkconf.set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
-      val error = intercept[SparkException] {
-        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_DRIVER_RESOURCE_PREFIX)
-      }.getMessage()
-
-      assert(error.contains("Error running the resource discovery script"))
-    }
-  }
-
-  test("Resource discoverer script returns invalid format") {
-    val sparkconf = new SparkConf
-    assume(!(Utils.isWindows))
-    withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val gpuDiscovery = mockDiscoveryScript(gpuFile,
-        """'{"addresses": ["0", "1"]}'""")
-      sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, gpuDiscovery)
-      val error = intercept[SparkException] {
-        ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-      }.getMessage()
-
-      assert(error.contains("Error running the resource discovery"))
-    }
-  }
-
-  test("Resource discoverer script doesn't exist") {
-    val sparkconf = new SparkConf
-    withTempDir { dir =>
-      val file1 = new File(dir, "bogusfilepath")
-      try {
-        sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-          SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, file1.getPath())
-
-        val error = intercept[SparkException] {
-          ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-        }.getMessage()
-
-        assert(error.contains("doesn't exist"))
-      } finally {
-        JavaFiles.deleteIfExists(file1.toPath())
-      }
-    }
-  }
-
-  test("gpu's specified but not discovery script") {
-    val sparkconf = new SparkConf
-    sparkconf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-      SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
-
-    val error = intercept[SparkException] {
-      ResourceDiscoverer.discoverResourcesInformation(sparkconf, 
SPARK_EXECUTOR_RESOURCE_PREFIX)
-    }.getMessage()
-
-    assert(error.contains("User is expecting to use"))
-  }
-
-}
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index c1265ce..74f5854 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -25,12 +25,14 @@ import scala.util.{Random, Try}
 
 import com.esotericsoftware.kryo.Kryo
 
-import org.apache.spark.ResourceName._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.History._
 import org.apache.spark.internal.config.Kryo._
 import org.apache.spark.internal.config.Network._
 import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, 
KryoSerializer}
 import org.apache.spark.util.{ResetSystemProperties, RpcUtils, Utils}
 
@@ -111,34 +113,6 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
     assert(conf.getOption("k4") === None)
   }
 
-  test("basic getAllWithPrefixAndPostfix") {
-    val conf = new SparkConf(false)
-    conf.set("spark.prefix.main.suffix", "v1")
-    val prefix = "spark.prefix."
-    val suffix = ".suffix"
-    assert(conf.getAllWithPrefixAndSuffix(prefix, suffix).toSet ===
-      Set(("main", "v1")))
-
-    conf.set("spark.prefix.main2.suffix", "v2")
-    conf.set("spark.prefix.main3.extra1.suffix", "v3")
-    conf.set("spark.prefix.main4.extra2.nonmatchingsuffix", "v4")
-    conf.set("spark.notmatchingprefix.main4.suffix", "v5")
-
-    assert(conf.getAllWithPrefixAndSuffix(prefix, suffix).toSet ===
-      Set(("main", "v1"), ("main2", "v2"), ("main3.extra1", "v3")))
-  }
-
-  test("test prefix config parsing utilities") {
-    val conf = new SparkConf(false)
-    conf.set("spark.prefix.main.suffix", "v1")
-    val prefix = "spark.prefix."
-    val suffix = ".suffix"
-    val configsWithPrefix = conf.getAllWithPrefix(prefix)
-    assert(configsWithPrefix.toSet === Set(("main.suffix", "v1")))
-    assert(SparkConf.getBaseOfConfigs(configsWithPrefix) === Set("main"))
-    assert(SparkConf.getConfigsWithSuffix(configsWithPrefix, suffix).toSet === 
Set(("main", "v1")))
-  }
-
   test("basic getAllWithPrefix") {
     val prefix = "spark.prefix."
     val conf = new SparkConf(false)
@@ -450,23 +424,20 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
 
   test("get task resource requirement from config") {
     val conf = new SparkConf()
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"1")
-    var taskResourceRequirement = conf.getTaskResourceRequirements()
+    conf.set(TASK_GPU_ID.amountConf, "2")
+    conf.set(TASK_FPGA_ID.amountConf, "1")
+    var taskResourceRequirement =
+      parseTaskResourceRequirements(conf).map(req => (req.resourceName, 
req.amount)).toMap
+
     assert(taskResourceRequirement.size == 2)
     assert(taskResourceRequirement(GPU) == 2)
     assert(taskResourceRequirement(FPGA) == 1)
 
-    conf.remove(SPARK_TASK_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_AMOUNT_SUFFIX)
+    conf.remove(TASK_FPGA_ID.amountConf)
     // Ignore invalid prefix
-    conf.set("spark.invalid.prefix" + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
-    taskResourceRequirement = conf.getTaskResourceRequirements()
-    assert(taskResourceRequirement.size == 1)
-    assert(taskResourceRequirement.get(FPGA).isEmpty)
-
-    // Ignore invalid suffix
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + "invalid.suffix", "1")
-    taskResourceRequirement = conf.getTaskResourceRequirements()
+    conf.set(ResourceID("spark.invalid.prefix", FPGA).amountConf, "1")
+    taskResourceRequirement =
+      parseTaskResourceRequirements(conf).map(req => (req.resourceName, 
req.amount)).toMap
     assert(taskResourceRequirement.size == 1)
     assert(taskResourceRequirement.get(FPGA).isEmpty)
   }
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index d1a36d4..fa2c4bd 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -20,9 +20,6 @@ package org.apache.spark
 import java.io.File
 import java.net.{MalformedURLException, URI}
 import java.nio.charset.StandardCharsets
-import java.nio.file.{Files => JavaFiles}
-import java.nio.file.attribute.PosixFilePermission._
-import java.util.EnumSet
 import java.util.concurrent.{CountDownLatch, Semaphore, TimeUnit}
 
 import scala.concurrent.duration._
@@ -33,15 +30,16 @@ import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
 import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => 
NewTextInputFormat}
-import org.json4s.JsonAST.JArray
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods.{compact, render}
+import org.json4s.{DefaultFormats, Extraction}
 import org.scalatest.Matchers._
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.ResourceName.GPU
+import org.apache.spark.TestUtils._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.resource.ResourceAllocation
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.scheduler.{SparkListener, 
SparkListenerExecutorMetricsUpdate, SparkListenerJobStart, 
SparkListenerTaskEnd, SparkListenerTaskStart}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -739,19 +737,16 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     }
   }
 
-  test("test gpu driver discovery under local-cluster mode") {
+  test("test driver discovery under local-cluster mode") {
     withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val scriptPath = mockDiscoveryScript(gpuFile,
-        """'{"name": "gpu","addresses":["5", "6"]}'""")
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
+        """{"name": "gpu","addresses":["5", "6"]}""")
 
       val conf = new SparkConf()
-        .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-          SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
-        .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-          SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
         .setMaster("local-cluster[1, 1, 1024]")
         .setAppName("test-cluster")
+      conf.set(DRIVER_GPU_ID.amountConf, "1")
+      conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
       sc = new SparkContext(conf)
 
       // Ensure all executors has started
@@ -764,39 +759,31 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
     }
   }
 
-  private def writeJsonFile(dir: File, strToWrite: JArray): String = {
-    val f1 = File.createTempFile("test-resource-parser1", "", dir)
-    JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes())
-    f1.getPath()
-  }
-
   test("test gpu driver resource files and discovery under local-cluster 
mode") {
     withTempDir { dir =>
-      val gpuFile = new File(dir, "gpuDiscoverScript")
-      val scriptPath = mockDiscoveryScript(gpuFile,
-        """'{"name": "gpu","addresses":["5", "6"]}'""")
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
+        """{"name": "gpu","addresses":["5", "6"]}""")
 
+      implicit val formats = DefaultFormats
       val gpusAllocated =
-        ("name" -> "gpu") ~
-        ("addresses" -> Seq("0", "1", "8"))
-      val ja = JArray(List(gpusAllocated))
-      val resourcesFile = writeJsonFile(dir, ja)
+        ResourceAllocation(DRIVER_GPU_ID, Seq("0", "1", "8"))
+      val ja = Extraction.decompose(Seq(gpusAllocated))
+      val resourcesFile = createTempJsonFile(dir, "resources", ja)
 
       val conf = new SparkConf()
-        .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-          SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
-        .set(SPARK_DRIVER_RESOURCE_PREFIX + GPU +
-          SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, scriptPath)
         .set(DRIVER_RESOURCES_FILE, resourcesFile)
         .setMaster("local-cluster[1, 1, 1024]")
         .setAppName("test-cluster")
+      conf.set(DRIVER_GPU_ID.amountConf, "1")
+      conf.set(DRIVER_GPU_ID.discoveryScriptConf, scriptPath)
+
       sc = new SparkContext(conf)
 
       // Ensure all executors has started
       eventually(timeout(10.seconds)) {
         assert(sc.statusTracker.getExecutorInfos.size == 1)
       }
-      // driver gpu addresses config should take precedence over the script
+      // driver gpu resources file should take precedence over the script
       assert(sc.resources.size === 1)
       assert(sc.resources.get(GPU).get.addresses === Array("0", "1", "8"))
       assert(sc.resources.get(GPU).get.name === "gpu")
@@ -805,10 +792,9 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 
   test("Test parsing resources task configs with missing executor config") {
     val conf = new SparkConf()
-      .set(SPARK_TASK_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
       .setMaster("local-cluster[1, 1, 1024]")
       .setAppName("test-cluster")
+    conf.set(TASK_GPU_ID.amountConf, "1")
 
     var error = intercept[SparkException] {
       sc = new SparkContext(conf)
@@ -821,28 +807,26 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
 
   test("Test parsing resources executor config < task requirements") {
     val conf = new SparkConf()
-      .set(SPARK_TASK_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
-      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU +
-        SPARK_RESOURCE_AMOUNT_SUFFIX, "1")
       .setMaster("local-cluster[1, 1, 1024]")
       .setAppName("test-cluster")
+    conf.set(TASK_GPU_ID.amountConf, "2")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "1")
 
     var error = intercept[SparkException] {
       sc = new SparkContext(conf)
     }.getMessage()
 
-    assert(error.contains("The executor resource config: " +
-      "spark.executor.resource.gpu.amount = 1 has to be >= the task config: " +
+    assert(error.contains("The executor resource config: 
spark.executor.resource.gpu.amount = 1 " +
+      "has to be >= the requested amount in task resource config: " +
       "spark.task.resource.gpu.amount = 2"))
   }
 
   test("Parse resources executor config not the same multiple numbers of the 
task requirements") {
     val conf = new SparkConf()
-      .set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
-      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "4")
       .setMaster("local-cluster[1, 1, 1024]")
       .setAppName("test-cluster")
+    conf.set(TASK_GPU_ID.amountConf, "2")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "4")
 
     var error = intercept[SparkException] {
       sc = new SparkContext(conf)
@@ -853,32 +837,21 @@ class SparkContextSuite extends SparkFunSuite with 
LocalSparkContext with Eventu
       "executor to: 1. Please adjust your configuration."))
   }
 
-  def mockDiscoveryScript(file: File, result: String): String = {
-    Files.write(s"echo $result", file, StandardCharsets.UTF_8)
-    JavaFiles.setPosixFilePermissions(file.toPath(),
-      EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
-    file.getPath()
-  }
-
   test("test resource scheduling under local-cluster mode") {
     import org.apache.spark.TestUtils._
 
     assume(!(Utils.isWindows))
     withTempDir { dir =>
-      val resourceFile = new File(dir, "resourceDiscoverScript")
-      val resources = """'{"name": "gpu", "addresses": ["0", "1", "2"]}'"""
-      Files.write(s"echo $resources", resourceFile, StandardCharsets.UTF_8)
-      JavaFiles.setPosixFilePermissions(resourceFile.toPath(),
-        EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
-      val discoveryScript = resourceFile.getPath()
+      val discoveryScript = createTempScriptWithExpectedOutput(dir, 
"resourceDiscoveryScript",
+        """{"name": "gpu","addresses":["0", "1", "2"]}""")
 
       val conf = new SparkConf()
-        
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_AMOUNT_SUFFIX}", 
"3")
-        
.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX}",
-          discoveryScript)
         .setMaster("local-cluster[3, 3, 1024]")
         .setAppName("test-cluster")
-      setTaskResourceRequirement(conf, GPU, 1)
+      conf.set(TASK_GPU_ID.amountConf, "1")
+      conf.set(EXECUTOR_GPU_ID.amountConf, "3")
+      conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, discoveryScript)
+
       sc = new SparkContext(conf)
 
       // Ensure all executors has started
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 3c4d51f..693b0ee 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -20,26 +20,23 @@ package org.apache.spark.executor
 import java.io.File
 import java.net.URL
 import java.nio.ByteBuffer
-import java.nio.charset.StandardCharsets
-import java.nio.file.{Files => JavaFiles}
-import java.nio.file.attribute.PosixFilePermission.{OWNER_EXECUTE, OWNER_READ, 
OWNER_WRITE}
-import java.util.{EnumSet, Properties}
+import java.util.Properties
 
 import scala.collection.mutable
 import scala.concurrent.duration._
 
-import com.google.common.io.Files
+import org.json4s.{DefaultFormats, Extraction}
 import org.json4s.JsonAST.{JArray, JObject}
 import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods.{compact, render}
 import org.mockito.Mockito.when
 import org.scalatest.concurrent.Eventually.{eventually, timeout}
 import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
-import org.apache.spark.ResourceInformation
-import org.apache.spark.ResourceName._
-import org.apache.spark.internal.config._
+import org.apache.spark.TestUtils._
+import org.apache.spark.resource.{ResourceAllocation, ResourceInformation}
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.TaskDescription
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.LaunchTask
@@ -49,15 +46,11 @@ import org.apache.spark.util.{SerializableBuffer, Utils}
 class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
     with LocalSparkContext with MockitoSugar {
 
-  private def writeFileWithJson(dir: File, strToWrite: JArray): String = {
-    val f1 = File.createTempFile("test-resource-parser1", "", dir)
-    JavaFiles.write(f1.toPath(), compact(render(strToWrite)).getBytes())
-    f1.getPath()
-  }
+  implicit val formats = DefaultFormats
 
   test("parsing no resources") {
     val conf = new SparkConf
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
+    conf.set(TASK_GPU_ID.amountConf, "2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
 
@@ -67,46 +60,45 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
     withTempDir { tmpDir =>
       val testResourceArgs: JObject = ("" -> "")
       val ja = JArray(List(testResourceArgs))
-      val f1 = writeFileWithJson(tmpDir, ja)
+      val f1 = createTempJsonFile(tmpDir, "resources", ja)
       var error = intercept[SparkException] {
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("Exception parsing the resources in"),
+      assert(error.contains("Error parsing resources file"),
         s"Calling with no resources didn't error as expected, error: $error")
     }
   }
 
-  test("parsing one resources") {
+  test("parsing one resource") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+    conf.set(TASK_GPU_ID.amountConf, "2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
     val backend = new CoarseGrainedExecutorBackend( env.rpcEnv, "driverurl", 
"1", "host1",
       4, Seq.empty[URL], env, None)
     withTempDir { tmpDir =>
-      val testResourceArgs =
-        ("name" -> "gpu") ~
-        ("addresses" -> Seq("0", "1"))
-      val ja = JArray(List(testResourceArgs))
-      val f1 = writeFileWithJson(tmpDir, ja)
+      val ra = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
+      val ja = Extraction.decompose(Seq(ra))
+      val f1 = createTempJsonFile(tmpDir, "resources", ja)
       val parsedResources = backend.parseOrFindResources(Some(f1))
 
       assert(parsedResources.size === 1)
       assert(parsedResources.get(GPU).nonEmpty)
-      assert(parsedResources.get(GPU).get.name === "gpu")
+      assert(parsedResources.get(GPU).get.name === GPU)
       assert(parsedResources.get(GPU).get.addresses.deep === Array("0", 
"1").deep)
     }
   }
 
   test("parsing multiple resources") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"3")
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+    conf.set(TASK_GPU_ID.amountConf, "2")
+    conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
+    conf.set(TASK_FPGA_ID.amountConf, "3")
+
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -114,30 +106,27 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       4, Seq.empty[URL], env, None)
 
     withTempDir { tmpDir =>
-      val gpuArgs =
-        ("name" -> "gpu") ~
-          ("addresses" -> Seq("0", "1"))
+      val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
       val fpgaArgs =
-        ("name" -> "fpga") ~
-          ("addresses" -> Seq("f1", "f2", "f3"))
-      val ja = JArray(List(gpuArgs, fpgaArgs))
-      val f1 = writeFileWithJson(tmpDir, ja)
+        ResourceAllocation(EXECUTOR_FPGA_ID, Seq("f1", "f2", "f3"))
+      val ja = Extraction.decompose(Seq(gpuArgs, fpgaArgs))
+      val f1 = createTempJsonFile(tmpDir, "resources", ja)
       val parsedResources = backend.parseOrFindResources(Some(f1))
 
       assert(parsedResources.size === 2)
       assert(parsedResources.get(GPU).nonEmpty)
-      assert(parsedResources.get(GPU).get.name === "gpu")
+      assert(parsedResources.get(GPU).get.name === GPU)
       assert(parsedResources.get(GPU).get.addresses.deep === Array("0", 
"1").deep)
       assert(parsedResources.get(FPGA).nonEmpty)
-      assert(parsedResources.get(FPGA).get.name === "fpga")
+      assert(parsedResources.get(FPGA).get.name === FPGA)
       assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", 
"f2", "f3").deep)
     }
   }
 
   test("error checking parsing resources and executor and task configs") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "2")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"2")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+    conf.set(TASK_GPU_ID.amountConf, "2")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -146,13 +135,11 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
     // not enough gpu's on the executor
     withTempDir { tmpDir =>
-      val gpuArgs =
-        ("name" -> "gpu") ~
-          ("addresses" -> Seq("0"))
-      val ja = JArray(List(gpuArgs))
-      val f1 = writeFileWithJson(tmpDir, ja)
+      val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0"))
+            val ja = Extraction.decompose(Seq(gpuArgs))
+      val f1 = createTempJsonFile(tmpDir, "resources", ja)
 
-      var error = intercept[SparkException] {
+      var error = intercept[IllegalArgumentException] {
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
@@ -162,24 +149,23 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
     // missing resource on the executor
     withTempDir { tmpDir =>
-      val gpuArgs =
-        ("name" -> "fpga") ~
-          ("addresses" -> Seq("0"))
-      val ja = JArray(List(gpuArgs))
-      val f1 = writeFileWithJson(tmpDir, ja)
+      val fpga = ResourceAllocation(EXECUTOR_FPGA_ID, Seq("0"))
+      val ja = Extraction.decompose(Seq(fpga))
+      val f1 = createTempJsonFile(tmpDir, "resources", ja)
 
       var error = intercept[SparkException] {
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("Resource: gpu required but wasn't discovered on 
startup"))
+      assert(error.contains("User is expecting to use resource: gpu but didn't 
specify a " +
+        "discovery script!"))
     }
   }
 
   test("executor resource found less than required") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "4")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + GPU + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"1")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "4")
+    conf.set(TASK_GPU_ID.amountConf, "1")
     val serializer = new JavaSerializer(conf)
     val env = createMockEnv(conf, serializer)
     // we don't really use this, just need it to get at the parser function
@@ -188,33 +174,28 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
     // executor resources < required
     withTempDir { tmpDir =>
-      val gpuArgs =
-        ("name" -> "gpu") ~
-          ("addresses" -> Seq("0", "1"))
-      val ja = JArray(List(gpuArgs))
-      val f1 = writeFileWithJson(tmpDir, ja)
+      val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
+      val ja = Extraction.decompose(Seq(gpuArgs))
+      val f1 = createTempJsonFile(tmpDir, "resources", ja)
 
-      var error = intercept[SparkException] {
+      var error = intercept[IllegalArgumentException] {
         val parsedResources = backend.parseOrFindResources(Some(f1))
       }.getMessage()
 
-      assert(error.contains("gpu, with addresses: 0,1 is less than what the 
user requested: 4"))
+      assert(error.contains("Resource: gpu, with addresses: 0,1 is less than 
what the " +
+        "user requested: 4"))
     }
   }
 
-  test("use discoverer") {
+  test("use resource discovery") {
     val conf = new SparkConf
-    conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
-    conf.set(SPARK_TASK_RESOURCE_PREFIX + FPGA + SPARK_RESOURCE_AMOUNT_SUFFIX, 
"3")
+    conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
+    conf.set(TASK_FPGA_ID.amountConf, "3")
     assume(!(Utils.isWindows))
     withTempDir { dir =>
-      val fpgaDiscovery = new File(dir, "resourceDiscoverScriptfpga")
-      Files.write("""echo '{"name": "fpga","addresses":["f1", "f2", "f3"]}'""",
-        fpgaDiscovery, StandardCharsets.UTF_8)
-      JavaFiles.setPosixFilePermissions(fpgaDiscovery.toPath(),
-        EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE))
-      conf.set(SPARK_EXECUTOR_RESOURCE_PREFIX + FPGA +
-        SPARK_RESOURCE_DISCOVERY_SCRIPT_SUFFIX, fpgaDiscovery.getPath())
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
+        """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+      conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath)
 
       val serializer = new JavaSerializer(conf)
       val env = createMockEnv(conf, serializer)
@@ -227,11 +208,43 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
 
       assert(parsedResources.size === 1)
       assert(parsedResources.get(FPGA).nonEmpty)
-      assert(parsedResources.get(FPGA).get.name === "fpga")
+      assert(parsedResources.get(FPGA).get.name === FPGA)
+      assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", 
"f2", "f3").deep)
+    }
+  }
+
+  test("use resource discovery and allocated file option") {
+    val conf = new SparkConf
+    conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
+    conf.set(TASK_FPGA_ID.amountConf, "3")
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"fpgaDiscoverScript",
+        """{"name": "fpga","addresses":["f1", "f2", "f3"]}""")
+      conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, scriptPath)
+
+      val serializer = new JavaSerializer(conf)
+      val env = createMockEnv(conf, serializer)
+
+      // we don't really use this, just need it to get at the parser function
+      val backend = new CoarseGrainedExecutorBackend(env.rpcEnv, "driverurl", 
"1", "host1",
+        4, Seq.empty[URL], env, None)
+      val gpuArgs = ResourceAllocation(EXECUTOR_GPU_ID, Seq("0", "1"))
+      val ja = Extraction.decompose(Seq(gpuArgs))
+      val f1 = createTempJsonFile(dir, "resources", ja)
+      val parsedResources = backend.parseOrFindResources(Some(f1))
+
+      assert(parsedResources.size === 2)
+      assert(parsedResources.get(GPU).nonEmpty)
+      assert(parsedResources.get(GPU).get.name === GPU)
+      assert(parsedResources.get(GPU).get.addresses.deep === Array("0", 
"1").deep)
+      assert(parsedResources.get(FPGA).nonEmpty)
+      assert(parsedResources.get(FPGA).get.name === FPGA)
       assert(parsedResources.get(FPGA).get.addresses.deep === Array("f1", 
"f2", "f3").deep)
     }
   }
 
+
   test("track allocated resources by taskId") {
     val conf = new SparkConf
     val securityMgr = new SecurityManager(conf)
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 495321f..8edf957 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -45,6 +45,7 @@ import org.apache.spark.internal.config.UI._
 import org.apache.spark.memory.TestMemoryManager
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rdd.RDD
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcTimeout}
 import org.apache.spark.scheduler.{FakeTask, ResultTask, Task, TaskDescription}
 import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceInformationSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceInformationSuite.scala
new file mode 100644
index 0000000..f5044c2
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/resource/ResourceInformationSuite.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.spark.resource
+
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+
+class ResourceInformationSuite extends SparkFunSuite {
+  test("ResourceInformation.parseJson for valid JSON") {
+    val json1 = compact(render(("name" -> "p100") ~ ("addresses" -> Seq("0", 
"1"))))
+    val info1 = ResourceInformation.parseJson(json1)
+    assert(info1.name === "p100")
+    assert(info1.addresses === Array("0", "1"))
+
+    // Currently we allow empty addresses.
+    val json2 = compact(render("name" -> "fpga"))
+    val info2 = ResourceInformation.parseJson(json2)
+    assert(info2.name === "fpga")
+    assert(info2.addresses.isEmpty)
+
+    val json3 = compact(render("addresses" -> Seq("0")))
+    val json4 = "invalid_json"
+    for (invalidJson <- Seq(json3, json4)) {
+      val ex = intercept[SparkException] {
+        print(ResourceInformation.parseJson(invalidJson))
+      }
+      assert(ex.getMessage.contains("Error parsing JSON into 
ResourceInformation"),
+        "Error message should provide context.")
+      assert(ex.getMessage.contains(invalidJson), "Error message should 
include input json.")
+    }
+  }
+
+  test("ResourceInformation.equals/hashCode") {
+    val a1 = new ResourceInformation("a", addresses = Array("0"))
+    val a21 = new ResourceInformation("a", addresses = Array("0", "1"))
+    val a22 = new ResourceInformation("a", addresses = Array("0", "1"))
+    val b2 = new ResourceInformation("b", addresses = Array("0", "1"))
+    object A2 extends ResourceInformation("a", Array("0", "1"))
+    assert(a1.equals(null) === false)
+    assert(a1.equals(a1))
+    assert(a1.equals(a21) === false)
+    assert(a21.equals(a22) && a21.hashCode() === a22.hashCode())
+    assert(a21.equals(b2) === false)
+    assert(a21.equals(A2) === false)
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
new file mode 100644
index 0000000..51a92e0
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.resource
+
+import java.io.File
+import java.nio.file.{Files => JavaFiles}
+
+import org.json4s.{DefaultFormats, Extraction}
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, 
SparkFunSuite}
+import org.apache.spark.TestUtils._
+import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
+import org.apache.spark.util.Utils
+
+class ResourceUtilsSuite extends SparkFunSuite
+    with LocalSparkContext {
+
+  test("ResourceID") {
+    val componentName = "spark.test"
+    val resourceName = "p100"
+    val id = ResourceID(componentName, resourceName)
+    val confPrefix = s"$componentName.resource.$resourceName."
+    assert(id.confPrefix === confPrefix)
+    assert(id.amountConf === s"${confPrefix}amount")
+    assert(id.discoveryScriptConf === s"${confPrefix}discoveryScript")
+    assert(id.vendorConf === s"${confPrefix}vendor")
+  }
+
+  test("Resource discoverer no addresses errors") {
+    val conf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoverScript",
+        """{"name": "gpu"}""")
+      conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+      conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, scriptPath)
+
+      val error = intercept[IllegalArgumentException] {
+        getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, None)
+      }.getMessage()
+      assert(error.contains("Resource: gpu, with " +
+        "addresses:  is less than what the user requested: 2"))
+    }
+  }
+
+  test("Resource discoverer multiple resource types") {
+    val conf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuDiscovery = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
+        """{"name": "gpu", "addresses": ["0", "1"]}""")
+      conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+      conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery)
+
+      val fpgaDiscovery = createTempScriptWithExpectedOutput(dir, 
"fpgDiscoverScript",
+        """{"name": "fpga", "addresses": ["f1", "f2", "f3"]}""")
+      conf.set(EXECUTOR_FPGA_ID.amountConf, "2")
+      conf.set(EXECUTOR_FPGA_ID.discoveryScriptConf, fpgaDiscovery)
+
+      val resources = getOrDiscoverAllResources(conf, SPARK_EXECUTOR_PREFIX, 
None)
+      assert(resources.size === 2)
+      val gpuValue = resources.get(GPU)
+      assert(gpuValue.nonEmpty, "Should have a gpu entry")
+      assert(gpuValue.get.name == "gpu", "name should be gpu")
+      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
+
+      val fpgaValue = resources.get(FPGA)
+      assert(fpgaValue.nonEmpty, "Should have a gpu entry")
+      assert(fpgaValue.get.name == "fpga", "name should be fpga")
+      assert(fpgaValue.get.addresses.size == 3, "Should have 3 indexes")
+      assert(fpgaValue.get.addresses.deep == Array("f1", "f2", "f3").deep,
+        "should have f1,f2,f3 entries")
+    }
+  }
+
+  test("get from resources file and discover the remaining") {
+    val conf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      implicit val formats = DefaultFormats
+      val fpgaAddrs = Seq("f1", "f2", "f3")
+      val fpgaAllocation = ResourceAllocation(EXECUTOR_FPGA_ID, fpgaAddrs)
+      val resourcesFile = createTempJsonFile(
+        dir, "resources", Extraction.decompose(Seq(fpgaAllocation)))
+      conf.set(EXECUTOR_FPGA_ID.amountConf, "3")
+      val resourcesFromFileOnly = getOrDiscoverAllResources(
+        conf, SPARK_EXECUTOR_PREFIX, Some(resourcesFile))
+      val expectedFpgaInfo = new ResourceInformation(FPGA, fpgaAddrs.toArray)
+      assert(resourcesFromFileOnly(FPGA) === expectedFpgaInfo)
+
+      val gpuDiscovery = createTempScriptWithExpectedOutput(
+        dir, "gpuDiscoveryScript", """{"name": "gpu", "addresses": ["0", 
"1"]}""")
+      conf.set(EXECUTOR_GPU_ID.amountConf, "2")
+      conf.set(EXECUTOR_GPU_ID.discoveryScriptConf, gpuDiscovery)
+      val resourcesFromBoth = getOrDiscoverAllResources(
+        conf, SPARK_EXECUTOR_PREFIX, Some(resourcesFile))
+      val expectedGpuInfo = new ResourceInformation(GPU, Array("0", "1"))
+      assert(resourcesFromBoth(FPGA) === expectedFpgaInfo)
+      assert(resourcesFromBoth(GPU) === expectedGpuInfo)
+    }
+  }
+
+  test("list resource ids") {
+    val conf = new SparkConf
+    conf.set(DRIVER_GPU_ID.amountConf, "2")
+    var resources = listResourceIds(conf, SPARK_DRIVER_PREFIX)
+    assert(resources.size === 1, "should only have GPU for resource")
+    assert(resources(0).resourceName == GPU, "name should be gpu")
+
+    conf.set(DRIVER_FPGA_ID.amountConf, "2")
+    val resourcesMap = listResourceIds(conf, SPARK_DRIVER_PREFIX)
+      .map{ rId => (rId.resourceName, 1)}.toMap
+    assert(resourcesMap.size === 2, "should only have GPU for resource")
+    assert(resourcesMap.get(GPU).nonEmpty, "should have GPU")
+    assert(resourcesMap.get(FPGA).nonEmpty, "should have FPGA")
+  }
+
+  test("parse resource request") {
+    val conf = new SparkConf
+    conf.set(DRIVER_GPU_ID.amountConf, "2")
+    var request = parseResourceRequest(conf, DRIVER_GPU_ID)
+    assert(request.id.resourceName === GPU, "should only have GPU for 
resource")
+    assert(request.amount === 2, "GPU count should be 2")
+    assert(request.discoveryScript === None, "discovery script should be 
empty")
+    assert(request.vendor === None, "vendor should be empty")
+
+    val vendor = "nvidia.com"
+    val discoveryScript = "discoveryScriptGPU"
+    conf.set(DRIVER_GPU_ID.discoveryScriptConf, discoveryScript)
+    conf.set(DRIVER_GPU_ID.vendorConf, vendor)
+    request = parseResourceRequest(conf, DRIVER_GPU_ID)
+    assert(request.id.resourceName === GPU, "should only have GPU for 
resource")
+    assert(request.amount === 2, "GPU count should be 2")
+    assert(request.discoveryScript.get === discoveryScript, "should get 
discovery script")
+    assert(request.vendor.get === vendor, "should get vendor")
+
+    conf.remove(DRIVER_GPU_ID.amountConf)
+    val error = intercept[SparkException] {
+      request = parseResourceRequest(conf, DRIVER_GPU_ID)
+    }.getMessage()
+
+    assert(error.contains("You must specify an amount for gpu"))
+  }
+
+  test("Resource discoverer multiple gpus on driver") {
+    val conf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuDiscovery = createTempScriptWithExpectedOutput(dir, 
"gpuDisocveryScript",
+        """{"name": "gpu", "addresses": ["0", "1"]}""")
+      conf.set(DRIVER_GPU_ID.amountConf, "2")
+      conf.set(DRIVER_GPU_ID.discoveryScriptConf, gpuDiscovery)
+
+      // make sure it reads from correct config, here it should use driver
+      val resources = getOrDiscoverAllResources(conf, SPARK_DRIVER_PREFIX, 
None)
+      val gpuValue = resources.get(GPU)
+      assert(gpuValue.nonEmpty, "Should have a gpu entry")
+      assert(gpuValue.get.name == "gpu", "name should be gpu")
+      assert(gpuValue.get.addresses.size == 2, "Should have 2 indexes")
+      assert(gpuValue.get.addresses.deep == Array("0", "1").deep, "should have 
0,1 entries")
+    }
+  }
+
+  test("Resource discoverer script returns mismatched name") {
+    val conf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuDiscovery = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
+        """{"name": "fpga", "addresses": ["0", "1"]}""")
+      val request =
+        ResourceRequest(
+          DRIVER_GPU_ID,
+          2,
+          Some(gpuDiscovery),
+          None)
+
+      val error = intercept[SparkException] {
+        discoverResource(request)
+      }.getMessage()
+
+      assert(error.contains(s"Error running the resource discovery script 
$gpuDiscovery: " +
+        "script returned resource name fpga and we were expecting gpu"))
+    }
+  }
+
+  test("Resource discoverer script returns invalid format") {
+    val conf = new SparkConf
+    assume(!(Utils.isWindows))
+    withTempDir { dir =>
+      val gpuDiscovery = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoverScript",
+        """{"addresses": ["0", "1"]}""")
+
+      val request =
+        ResourceRequest(
+          EXECUTOR_GPU_ID,
+          2,
+          Some(gpuDiscovery),
+          None)
+
+      val error = intercept[SparkException] {
+        discoverResource(request)
+      }.getMessage()
+
+      assert(error.contains("Error parsing JSON into ResourceInformation"))
+    }
+  }
+
+  test("Resource discoverer script doesn't exist") {
+    val conf = new SparkConf
+    withTempDir { dir =>
+      val file1 = new File(dir, "bogusfilepath")
+      try {
+        val request =
+          ResourceRequest(
+            EXECUTOR_GPU_ID,
+            2,
+            Some(file1.getPath()),
+            None)
+
+        val error = intercept[SparkException] {
+          discoverResource(request)
+        }.getMessage()
+
+        assert(error.contains("doesn't exist"))
+      } finally {
+        JavaFiles.deleteIfExists(file1.toPath())
+      }
+    }
+  }
+
+  test("gpu's specified but not a discovery script") {
+    val request = ResourceRequest(EXECUTOR_GPU_ID, 2, None, None)
+
+    val error = intercept[SparkException] {
+      discoverResource(request)
+    }.getMessage()
+
+    assert(error.contains("User is expecting to use resource: gpu but " +
+      "didn't specify a discovery script!"))
+  }
+}
diff --git a/core/src/test/scala/org/apache/spark/ResourceName.scala 
b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala
similarity index 57%
rename from core/src/test/scala/org/apache/spark/ResourceName.scala
rename to core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala
index 6efe064..6d2c07d 100644
--- a/core/src/test/scala/org/apache/spark/ResourceName.scala
+++ b/core/src/test/scala/org/apache/spark/resource/TestResourceIDs.scala
@@ -15,10 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.spark
+package org.apache.spark.resource
 
-private[spark] object ResourceName {
-  // known types of resources
-  final val GPU: String = "gpu"
-  final val FPGA: String = "fpga"
+import org.apache.spark.internal.config.{SPARK_DRIVER_PREFIX, 
SPARK_EXECUTOR_PREFIX, SPARK_TASK_PREFIX}
+import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
+
+object TestResourceIDs {
+  val DRIVER_GPU_ID = ResourceID(SPARK_DRIVER_PREFIX, GPU)
+  val EXECUTOR_GPU_ID = ResourceID(SPARK_EXECUTOR_PREFIX, GPU)
+  val TASK_GPU_ID = ResourceID(SPARK_TASK_PREFIX, GPU)
+
+  val DRIVER_FPGA_ID = ResourceID(SPARK_DRIVER_PREFIX, FPGA)
+  val EXECUTOR_FPGA_ID = ResourceID(SPARK_EXECUTOR_PREFIX, FPGA)
+  val TASK_FPGA_ID = ResourceID(SPARK_TASK_PREFIX, FPGA)
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 70d368e..3edbbeb 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -32,10 +32,12 @@ import org.scalatest.concurrent.Eventually
 import org.scalatest.mockito.MockitoSugar._
 
 import org.apache.spark._
-import org.apache.spark.ResourceName.GPU
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.rdd.RDD
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -187,12 +189,12 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
 
     val conf = new SparkConf()
       .set(EXECUTOR_CORES, 3)
-      .set(SPARK_EXECUTOR_RESOURCE_PREFIX + GPU + 
SPARK_RESOURCE_AMOUNT_SUFFIX, "3")
       .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive 
during test
       .setMaster(
       
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
       .setAppName("test")
-    setTaskResourceRequirement(conf, GPU, 1)
+    conf.set(TASK_GPU_ID.amountConf, "1")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "1")
 
     sc = new SparkContext(conf)
     val backend = 
sc.schedulerBackend.asInstanceOf[TestCoarseGrainedSchedulerBackend]
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
index b4ff730..0109d1f 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkException, SparkFunSuite}
-import org.apache.spark.ResourceName.GPU
+import org.apache.spark.resource.ResourceUtils.GPU
 
 class ExecutorResourceInfoSuite extends SparkFunSuite {
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
index 233bc73..5839532f 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
@@ -23,9 +23,9 @@ import java.util.Properties
 
 import scala.collection.mutable.HashMap
 
-import org.apache.spark.ResourceInformation
-import org.apache.spark.ResourceName.GPU
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceUtils.GPU
 
 class TaskDescriptionSuite extends SparkFunSuite {
   test("encoding and then decoding a TaskDescription results in the same 
TaskDescription") {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 9a4c7a9..d1b1616 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -29,9 +29,10 @@ import org.scalatest.concurrent.Eventually
 import org.scalatest.mockito.MockitoSugar
 
 import org.apache.spark._
-import org.apache.spark.ResourceName.GPU
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.util.ManualClock
 
 class FakeSchedulerBackend extends SchedulerBackend {
@@ -1249,12 +1250,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     val taskGpus = 1
     val executorGpus = 4
     val executorCpus = 4
+
     val taskScheduler = setupScheduler(numCores = executorCpus,
       config.CPUS_PER_TASK.key -> taskCpus.toString,
-      
s"${config.SPARK_TASK_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}"
 ->
-        taskGpus.toString,
-      
s"${config.SPARK_EXECUTOR_RESOURCE_PREFIX}${GPU}${config.SPARK_RESOURCE_AMOUNT_SUFFIX}"
 ->
-        executorGpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
       config.EXECUTOR_CORES.key -> executorCpus.toString)
     val taskSet = FakeTask.createTaskSet(3)
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 3d09b10..da566dd 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -27,9 +27,10 @@ import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 
 import org.apache.spark._
-import org.apache.spark.ResourceName.GPU
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.serializer.SerializerInstance
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.{AccumulatorV2, ManualClock}
@@ -1639,7 +1640,7 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
     import TestUtils._
 
     sc = new SparkContext("local", "test")
-    setTaskResourceRequirement(sc.conf, GPU, 2)
+    sc.conf.set(TASK_GPU_ID.amountConf, "2")
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
     val taskSet = FakeTask.createTaskSet(1)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index ddad02a..b1b7751 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -32,8 +32,8 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.{SPARK_RESOURCE_AMOUNT_SUFFIX, 
SPARK_RESOURCE_VENDOR_SUFFIX}
 import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.resource.ResourceUtils
 import org.apache.spark.util.{Clock, SystemClock, Utils}
 import org.apache.spark.util.Utils.getHadoopFileSystem
 
@@ -226,20 +226,14 @@ private[spark] object KubernetesUtils extends Logging {
   def buildResourcesQuantities(
       componentName: String,
       sparkConf: SparkConf): Map[String, Quantity] = {
-    val allResources = sparkConf.getAllWithPrefix(componentName)
-    val vendors = SparkConf.getConfigsWithSuffix(allResources, 
SPARK_RESOURCE_VENDOR_SUFFIX).toMap
-    val amounts = SparkConf.getConfigsWithSuffix(allResources, 
SPARK_RESOURCE_AMOUNT_SUFFIX).toMap
-    val uniqueResources = SparkConf.getBaseOfConfigs(allResources)
-
-    uniqueResources.map { rName =>
-      val vendorDomain = vendors.get(rName).getOrElse(throw new 
SparkException("Resource: " +
-        s"$rName was requested, but vendor was not specified."))
-      val amount = amounts.get(rName).getOrElse(throw new 
SparkException(s"Resource: $rName " +
-        "was requested, but count was not specified."))
+    val requests = ResourceUtils.parseAllResourceRequests(sparkConf, 
componentName)
+    requests.map { request =>
+      val vendorDomain = request.vendor.getOrElse(throw new 
SparkException("Resource: " +
+        s"${request.id.resourceName} was requested, but vendor was not 
specified."))
       val quantity = new QuantityBuilder(false)
-        .withAmount(amount)
+        .withAmount(request.amount.toString)
         .build()
-      (KubernetesConf.buildKubernetesResourceName(vendorDomain, rName), 
quantity)
+      (KubernetesConf.buildKubernetesResourceName(vendorDomain, 
request.id.resourceName), quantity)
     }.toMap
   }
 
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
index d10f69f..1944ba9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala
@@ -89,7 +89,7 @@ private[spark] class BasicDriverFeatureStep(conf: 
KubernetesDriverConf)
     }
 
     val driverResourceQuantities =
-      KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_RESOURCE_PREFIX, 
conf.sparkConf)
+      KubernetesUtils.buildResourcesQuantities(SPARK_DRIVER_PREFIX, 
conf.sparkConf)
 
     val driverPort = conf.sparkConf.getInt(DRIVER_PORT.key, 
DEFAULT_DRIVER_PORT)
     val driverBlockManagerPort = conf.sparkConf.getInt(
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index d46a9b8..d648755 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -96,7 +96,7 @@ private[spark] class BasicExecutorFeatureStep(
       .build()
 
     val executorResourceQuantities =
-      KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_RESOURCE_PREFIX,
+      KubernetesUtils.buildResourcesQuantities(SPARK_EXECUTOR_PREFIX,
         kubernetesConf.sparkConf)
 
     val executorEnv: Seq[EnvVar] = {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
index df326a2..3706721 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala
@@ -28,6 +28,8 @@ import 
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.UI._
+import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.util.Utils
 
 class BasicDriverFeatureStepSuite extends SparkFunSuite {
@@ -45,7 +47,9 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
     }
 
   test("Check the pod respects all configurations from the user.") {
-    val resources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", 
"2", "nvidia.com")))
+    val resourceID = ResourceID(SPARK_DRIVER_PREFIX, GPU)
+    val resources =
+      Map(("nvidia.com/gpu" -> TestResourceInformation(resourceID, "2", 
"nvidia.com")))
     val sparkConf = new SparkConf()
       .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
       .set(DRIVER_CORES, 2)
@@ -55,12 +59,8 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite {
       .set(CONTAINER_IMAGE, "spark-driver:latest")
       .set(IMAGE_PULL_SECRETS, TEST_IMAGE_PULL_SECRETS)
     resources.foreach { case (_, testRInfo) =>
-      sparkConf.set(
-        
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
-        testRInfo.count)
-      sparkConf.set(
-        
s"${SPARK_DRIVER_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
-        testRInfo.vendor)
+      sparkConf.set(testRInfo.rId.amountConf, testRInfo.count)
+      sparkConf.set(testRInfo.rId.vendorConf, testRInfo.vendor)
     }
     val kubernetesConf = KubernetesTestConf.createDriverConf(
       sparkConf = sparkConf,
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index eb5532e..51067bd 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -34,6 +34,9 @@ import 
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestReso
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python._
+import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEndpointAddress
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
 import org.apache.spark.util.Utils
@@ -93,13 +96,7 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
   }
 
   test("test spark resource missing vendor") {
-    val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", 
"2", "nvidia.com")))
-    // test missing vendor
-    gpuResources.foreach { case (_, testRInfo) =>
-      baseConf.set(
-        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
-        testRInfo.count)
-    }
+    baseConf.set(EXECUTOR_GPU_ID.amountConf, "2")
     val step = new BasicExecutorFeatureStep(newExecutorConf(), new 
SecurityManager(baseConf))
     val error = intercept[SparkException] {
       val executor = step.configurePod(SparkPod.initialPod())
@@ -108,30 +105,24 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
   }
 
   test("test spark resource missing amount") {
-    val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", 
"2", "nvidia.com")))
-    // test missing count
-    gpuResources.foreach { case (_, testRInfo) =>
-      baseConf.set(
-        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
-        testRInfo.vendor)
-    }
+    baseConf.set(EXECUTOR_GPU_ID.vendorConf, "nvidia.com")
+
     val step = new BasicExecutorFeatureStep(newExecutorConf(), new 
SecurityManager(baseConf))
     val error = intercept[SparkException] {
       val executor = step.configurePod(SparkPod.initialPod())
     }.getMessage()
-    assert(error.contains("Resource: gpu was requested, but count was not 
specified"))
+    assert(error.contains("You must specify an amount for gpu"))
   }
 
   test("basic executor pod with resources") {
-    val gpuResources = Map(("nvidia.com/gpu" -> TestResourceInformation("gpu", 
"2", "nvidia.com")),
-      ("foo.com/fpga" -> TestResourceInformation("fpga", "f1", "foo.com")))
+    val fpgaResourceID = ResourceID(SPARK_EXECUTOR_PREFIX, FPGA)
+    val gpuExecutorResourceID = ResourceID(SPARK_EXECUTOR_PREFIX, GPU)
+    val gpuResources =
+      Map(("nvidia.com/gpu" -> TestResourceInformation(gpuExecutorResourceID, 
"2", "nvidia.com")),
+      ("foo.com/fpga" -> TestResourceInformation(fpgaResourceID, "1", 
"foo.com")))
     gpuResources.foreach { case (_, testRInfo) =>
-      baseConf.set(
-        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
-        testRInfo.count)
-      baseConf.set(
-        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${testRInfo.rName}${SPARK_RESOURCE_VENDOR_SUFFIX}",
-        testRInfo.vendor)
+      baseConf.set(testRInfo.rId.amountConf, testRInfo.count)
+      baseConf.set(testRInfo.rId.vendorConf, testRInfo.vendor)
     }
     val step = new BasicExecutorFeatureStep(newExecutorConf(), new 
SecurityManager(baseConf))
     val executor = step.configurePod(SparkPod.initialPod())
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
index e8be3b0..284887f 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala
@@ -25,6 +25,7 @@ import org.mockito.Mockito.{mock, when}
 import org.mockito.invocation.InvocationOnMock
 
 import org.apache.spark.deploy.k8s.SparkPod
+import org.apache.spark.resource.ResourceID
 
 object KubernetesFeaturesTestUtils {
 
@@ -67,5 +68,5 @@ object KubernetesFeaturesTestUtils {
     list.filter(_.getClass() == desired).map(_.asInstanceOf[T])
   }
 
-  case class TestResourceInformation(rName: String, count: String, vendor: 
String)
+  case class TestResourceInformation(rId: ResourceID, count: String, vendor: 
String)
 }
diff --git 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
index b70be7a..79a57ad 100644
--- 
a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ 
b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
@@ -36,10 +36,11 @@ import org.mockito.ArgumentMatchers.{any, anyLong, eq => 
meq}
 import org.mockito.Mockito._
 import org.scalatest.mockito.MockitoSugar
 
-import org.apache.spark.{LocalSparkContext, ResourceInformation, SparkConf, 
SparkContext,
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
   SparkFunSuite}
 import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.executor.MesosExecutorBackend
+import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
   TaskDescription, TaskSchedulerImpl, WorkerOffer}
 import org.apache.spark.scheduler.cluster.ExecutorInfo
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 0eade29..5b361d1 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -248,7 +248,7 @@ private[spark] class Client(
         sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap
       }
     val amResources = yarnAMResources ++
-      getYarnResourcesFromSparkResources(SPARK_DRIVER_RESOURCE_PREFIX, 
sparkConf)
+      getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
     logDebug(s"AM resources: $amResources")
     val appContext = newApp.getApplicationSubmissionContext
     appContext.setApplicationName(sparkConf.get("spark.app.name", "Spark"))
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index 7480dd8..cb0c68d 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -30,6 +30,7 @@ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceID
 import org.apache.spark.util.{CausedBy, Utils}
 
 /**
@@ -68,13 +69,13 @@ private object ResourceRequestHelper extends Logging {
       (AM_CORES.key, YARN_AM_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
       (DRIVER_CORES.key, YARN_DRIVER_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
       (EXECUTOR_CORES.key, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "cpu-vcores"),
-      (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}",
+      (ResourceID(SPARK_EXECUTOR_PREFIX, "fpga").amountConf,
         s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
-      (s"${SPARK_DRIVER_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}",
+      (ResourceID(SPARK_DRIVER_PREFIX, "fpga").amountConf,
         s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
-      (s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}",
+      (ResourceID(SPARK_EXECUTOR_PREFIX, "gpu").amountConf,
         s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"),
-      (s"${SPARK_DRIVER_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}",
+      (ResourceID(SPARK_DRIVER_PREFIX, "gpu").amountConf,
         s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"))
 
     val errorMessage = new mutable.StringBuilder()
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 069b287..6e634b9 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -143,7 +143,7 @@ private[yarn] class YarnAllocator(
 
   private val executorResourceRequests =
     
sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++
-      getYarnResourcesFromSparkResources(SPARK_EXECUTOR_RESOURCE_PREFIX, 
sparkConf)
+      getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
 
   // Resource capability requested for each executor
   private[yarn] val resource: Resource = {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 0f11820..6b87eec 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -26,8 +26,9 @@ import 
org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P
 import org.apache.hadoop.yarn.util.ConverterUtils
 
 import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.internal.config._
 import org.apache.spark.launcher.YarnCommandBuilderUtils
+import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.util.Utils
 
 object YarnSparkHadoopUtil {
@@ -53,10 +54,9 @@ object YarnSparkHadoopUtil {
       confPrefix: String,
       sparkConf: SparkConf
       ): Map[String, String] = {
-    Map("gpu" -> YARN_GPU_RESOURCE_CONFIG, "fpga" -> 
YARN_FPGA_RESOURCE_CONFIG).map {
+    Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> 
YARN_FPGA_RESOURCE_CONFIG).map {
       case (rName, yarnName) =>
-        val resourceCountSparkConf = 
s"${confPrefix}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}"
-        (yarnName -> sparkConf.get(resourceCountSparkConf, "0"))
+        (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, 
"0"))
     }.filter { case (_, count) => count.toLong > 0 }
   }
 
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
index 380da36..2e5748b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/executor/YarnCoarseGrainedExecutorBackend.scala
@@ -68,7 +68,8 @@ private[spark] object YarnCoarseGrainedExecutorBackend 
extends Logging {
     val createFn: (RpcEnv, CoarseGrainedExecutorBackend.Arguments, SparkEnv) =>
       CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env) =>
       new YarnCoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, 
arguments.executorId,
-        arguments.hostname, arguments.cores, arguments.userClassPath, env, 
arguments.resourcesFile)
+        arguments.hostname, arguments.cores, arguments.userClassPath, env,
+        arguments.resourcesFileOpt)
     }
     val backendArgs = CoarseGrainedExecutorBackend.parseArguments(args,
       this.getClass.getCanonicalName.stripSuffix("$"))
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 884e0f5..d5f1992 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -23,7 +23,6 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.{HashMap => MutableHashMap}
-import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -42,6 +41,7 @@ import org.apache.spark.{SparkConf, SparkException, 
SparkFunSuite, TestUtils}
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceID
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
 class ClientSuite extends SparkFunSuite with Matchers {
@@ -400,7 +400,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
     }
     resources.values.foreach { rName =>
-      
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
 "3")
+      conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
     }
 
     val error = intercept[SparkException] {
@@ -423,7 +423,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
     }
     resources.values.foreach { rName =>
-      
conf.set(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
 "3")
+      conf.set(ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3")
     }
 
     val error = intercept[SparkException] {
@@ -447,7 +447,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
 
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     resources.values.foreach { rName =>
-      
conf.set(s"${SPARK_DRIVER_RESOURCE_PREFIX}${rName}${SPARK_RESOURCE_AMOUNT_SUFFIX}",
 "3")
+      conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
     }
     // also just set yarn one that we don't convert
     conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5")
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 0040369..ca89af2 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
 import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config._
+import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.scheduler.SplitInfo
 import org.apache.spark.util.ManualClock
@@ -187,10 +188,9 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with BeforeAndAfter
     val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, 
YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource)
     ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
     val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
-
     val sparkResources =
-      
Map(s"${SPARK_EXECUTOR_RESOURCE_PREFIX}gpu${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> 
"3",
-        
s"${SPARK_EXECUTOR_RESOURCE_PREFIX}fpga${SPARK_RESOURCE_AMOUNT_SUFFIX}" -> "2",
+      Map(EXECUTOR_GPU_ID.amountConf -> "3",
+        EXECUTOR_FPGA_ID.amountConf -> "2",
         s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5")
     val handler = createAllocator(1, mockAmClient, sparkResources)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to