This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ae2e00e8f8e9 [SPARK-45527][CORE] Use fraction to do the resource 
calculation
ae2e00e8f8e9 is described below

commit ae2e00e8f8e96fe85ecd5539aac0803216b8e66f
Author: Bobby Wang <wbo4...@gmail.com>
AuthorDate: Thu Jan 4 09:19:22 2024 -0600

    [SPARK-45527][CORE] Use fraction to do the resource calculation
    
    ### What changes were proposed in this pull request?
    
    This (PR) introduces the utilization of fractions instead of slots, which 
is similar to the CPU strategy,
    for determining whether a worker offer can provide the necessary resources 
to tasks.
    
    For instance, when an executor reports to the driver with [gpu, ["1,", 
"2"]], the driver constructs an executor data map.
    The keys in the map represent the GPU addresses, and their default values 
are set to 1.0, indicating one whole GPU.
    
    Consequently, the available resource amounts for the executor are as 
follows: { "1" -> 1.0f, "2" -> 1.0f }.
    
    When offering resources to a task that requires 1 CPU and 0.08 GPU, the 
worker offer examines the available resource amounts.
    It identifies that the capacity of GPU address "1.0" is greater than the 
task's GPU requirement (1.0 >= 0.08).
    Therefore, Spark assigns the GPU address "1" to this task. After the 
assignment, the available resource amounts
    for this executor are updated to { "1" -> 0.92, "2" -> 1.0}, ensuring that 
the remaining resources can be allocated to other tasks.
    
    In scenarios where other tasks, using different task resource profiles, 
request varying GPU amounts
    when dynamic allocation is disabled, Spark applies the same comparison 
approach. It compares the task's GPU requirement with
    the available resource amounts to determine if the resources can be 
assigned to the task.
    
    ### Why are the changes needed?
    
    The existing resources offering including gpu, fpga is based on "slots per 
address", which is defined by the default resource profile.
    and it's a fixed number for all different resource profiles when dynamic 
allcation is disabled.
    
    Consider the below test case,
    
    ``` scala
      withTempDir { dir =>
        val scriptPath = createTempScriptWithExpectedOutput(dir, 
"gpuDiscoveryScript",
          """{"name": "gpu","addresses":["0"]}""")
    
        val conf = new SparkConf()
          .setAppName("test")
          .setMaster("local-cluster[1, 12, 1024]")
          .set("spark.executor.cores", "12")
    
        conf.set("spark.worker.resource.gpu.amount", "1")
        conf.set("spark.worker.resource.gpu.discoveryScript", scriptPath)
        conf.set("spark.executor.resource.gpu.amount", "1")
        conf.set("spark.task.resource.gpu.amount", "0.08")
    
        sc = new SparkContext(conf)
        val rdd = sc.range(0, 100, 1, 4)
        var rdd1 = rdd.repartition(3)
        val treqs = new TaskResourceRequests().cpus(1).resource("gpu", 1.0)
        val rp = new ResourceProfileBuilder().require(treqs).build
        rdd1 = rdd1.withResources(rp)
        assert(rdd1.collect().size === 100)
      }
    ```
    
    During the initial stages, Spark generates a default resource profile based 
on the configurations. The calculation
    for determining the slots per GPU address is performed as 
"spark.executor.resource.gpu.amount / spark.task.resource.gpu.amount",
    resulting in a value of 12 (1/0.08 = 12). This means that Spark can 
accommodate up to 12 tasks running on each GPU address simultaneously.
    
    The job is then divided into two stages. The first stage, which consists of 
4 tasks, runs concurrently based on
    the default resource profile. However, the second stage, comprising 3 
tasks, runs sequentially using a new task
    resource profile. This new profile specifies that each task requires 1 CPU 
and 1.0 full GPU.
    
    In reality, the tasks in the second stage are running in parallel, which is 
the underlying issue.
    
    The problem lies in the line `new 
TaskResourceRequests().cpus(1).resource("gpu", 1.0)`. The value of 1.0
    for the GPU, or any value below 1.0 (specifically, (0, 0.5] which is 
rounded up to 1.0, spark throws an exception if the value is in (0.5, 1)),
    is merely requesting the number of slots. In this case, it is requesting 
only 1 slot. Consequently, each task
    necessitates 1 CPU core and 1 GPU slot, resulting in all tasks running 
simultaneously.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    To ensure all tests got passed
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43494 from wbo4958/SPARK-45527.
    
    Authored-by: Bobby Wang <wbo4...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/deploy/master/WorkerInfo.scala    |  22 +-
 .../executor/CoarseGrainedExecutorBackend.scala    |  15 +-
 .../scala/org/apache/spark/executor/Executor.scala |   6 +-
 .../apache/spark/resource/ResourceAllocator.scala  | 118 +++--
 .../apache/spark/resource/ResourceProfile.scala    |  28 +-
 .../org/apache/spark/resource/ResourceUtils.scala  |  13 +-
 .../spark/resource/TaskResourceRequest.scala       |   6 +-
 .../spark/scheduler/ExecutorResourceInfo.scala     |  21 +-
 .../spark/scheduler/ExecutorResourcesAmounts.scala | 217 ++++++++
 .../apache/spark/scheduler/TaskDescription.scala   |  39 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  51 +-
 .../apache/spark/scheduler/TaskSetManager.scala    |   8 +-
 .../org/apache/spark/scheduler/WorkerOffer.scala   |   4 +-
 .../cluster/CoarseGrainedClusterMessage.scala      |   4 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  28 +-
 .../scala/org/apache/spark/SparkConfSuite.scala    |   2 +-
 .../CoarseGrainedExecutorBackendSuite.scala        |  36 +-
 .../org/apache/spark/executor/ExecutorSuite.scala  |   2 +-
 .../spark/resource/ResourceProfileSuite.scala      |  36 +-
 .../apache/spark/resource/ResourceUtilsSuite.scala | 247 ++++++++++
 .../CoarseGrainedSchedulerBackendSuite.scala       |   5 +-
 .../scheduler/ExecutorResourceInfoSuite.scala      | 132 ++++-
 .../scheduler/ExecutorResourcesAmountsSuite.scala  | 545 +++++++++++++++++++++
 .../spark/scheduler/TaskDescriptionSuite.scala     |  21 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 477 +++++++++++++++++-
 .../spark/scheduler/TaskSetManagerSuite.scala      |  10 +-
 26 files changed, 1840 insertions(+), 253 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 4d475fa8a6e8..bdc9e9c6106c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -20,8 +20,8 @@ package org.apache.spark.deploy.master
 import scala.collection.mutable
 
 import org.apache.spark.resource.{ResourceAllocator, ResourceInformation, 
ResourceRequirement}
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
 import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.util.ArrayImplicits._
 import org.apache.spark.util.Utils
 
 private[spark] case class WorkerResourceInfo(name: String, addresses: 
Seq[String])
@@ -29,12 +29,20 @@ private[spark] case class WorkerResourceInfo(name: String, 
addresses: Seq[String
 
   override protected def resourceName = this.name
   override protected def resourceAddresses = this.addresses
-  override protected def slotsPerAddress: Int = 1
 
+  /**
+   * Acquire the resources.
+   * @param amount How many addresses are requesting.
+   * @return ResourceInformation
+   */
   def acquire(amount: Int): ResourceInformation = {
-    val allocated = availableAddrs.take(amount)
-    acquire(allocated)
-    new ResourceInformation(resourceName, allocated.toArray)
+    // Any available address from availableAddrs must be a whole resource
+    // since worker needs to do full resources to the executors.
+    val addresses = availableAddrs.take(amount)
+    assert(addresses.length == amount)
+
+    acquire(addresses.map(addr => addr -> ONE_ENTIRE_RESOURCE).toMap)
+    new ResourceInformation(resourceName, addresses.toArray)
   }
 }
 
@@ -163,7 +171,7 @@ private[spark] class WorkerInfo(
    */
   def recoverResources(expected: Map[String, ResourceInformation]): Unit = {
     expected.foreach { case (rName, rInfo) =>
-      resources(rName).acquire(rInfo.addresses.toImmutableArraySeq)
+      resources(rName).acquire(rInfo.addresses.map(addr => addr -> 
ONE_ENTIRE_RESOURCE).toMap)
     }
   }
 
@@ -173,7 +181,7 @@ private[spark] class WorkerInfo(
    */
   private def releaseResources(allocated: Map[String, ResourceInformation]): 
Unit = {
     allocated.foreach { case (rName, rInfo) =>
-      resources(rName).release(rInfo.addresses.toImmutableArraySeq)
+      resources(rName).release(rInfo.addresses.map(addrs => addrs -> 
ONE_ENTIRE_RESOURCE).toMap)
     }
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 366b481bf6a4..b507d27f14c4 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -20,7 +20,6 @@ package org.apache.spark.executor
 import java.net.URL
 import java.nio.ByteBuffer
 import java.util.Locale
-import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
 
 import scala.util.{Failure, Success}
@@ -65,16 +64,6 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private var _resources = Map.empty[String, ResourceInformation]
 
-  /**
-   * Map each taskId to the information about the resource allocated to it, 
Please refer to
-   * [[ResourceInformation]] for specifics.
-   * CHM is used to ensure thread-safety 
(https://issues.apache.org/jira/browse/SPARK-45227)
-   * Exposed for testing only.
-   */
-  private[executor] val taskResources = new ConcurrentHashMap[
-    Long, Map[String, ResourceInformation]
-  ]
-
   private var decommissioned = false
 
   // Track the last time in ns that at least one task is running. If no task 
is running and all
@@ -192,7 +181,6 @@ private[spark] class CoarseGrainedExecutorBackend(
       } else {
         val taskDesc = TaskDescription.decode(data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
-        taskResources.put(taskDesc.taskId, taskDesc.resources)
         executor.launchTask(this, taskDesc)
       }
 
@@ -272,11 +260,10 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
   override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): 
Unit = {
-    val resources = taskResources.getOrDefault(taskId, Map.empty[String, 
ResourceInformation])
+    val resources = executor.runningTasks.get(taskId).taskDescription.resources
     val cpus = executor.runningTasks.get(taskId).taskDescription.cpus
     val msg = StatusUpdate(executorId, taskId, state, data, cpus, resources)
     if (TaskState.isFinished(state)) {
-      taskResources.remove(taskId)
       lastTaskFinishTime.set(System.nanoTime())
     }
     driver match {
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 12471915cd97..dae00a72285d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -619,13 +619,17 @@ private[spark] class Executor(
           threadMXBean.getCurrentThreadCpuTime
         } else 0L
         var threwException = true
+        // Convert resources amounts info to ResourceInformation
+        val resources = taskDescription.resources.map { case (rName, 
addressesAmounts) =>
+          rName -> new ResourceInformation(rName, 
addressesAmounts.keys.toSeq.sorted.toArray)
+        }
         val value = Utils.tryWithSafeFinally {
           val res = task.run(
             taskAttemptId = taskId,
             attemptNumber = taskDescription.attemptNumber,
             metricsSystem = env.metricsSystem,
             cpus = taskDescription.cpus,
-            resources = taskDescription.resources,
+            resources = resources,
             plugins = plugins)
           threwException = false
           res
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
index 7b97d9704282..e9bb11721725 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceAllocator.scala
@@ -20,6 +20,49 @@ package org.apache.spark.resource
 import scala.collection.mutable
 
 import org.apache.spark.SparkException
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+private[spark] object ResourceAmountUtils {
+  /**
+   * Using "double" to do the resource calculation may encounter a problem of 
precision loss. Eg
+   *
+   * scala&gt; val taskAmount = 1.0 / 9
+   * taskAmount: Double = 0.1111111111111111
+   *
+   * scala&gt; var total = 1.0
+   * total: Double = 1.0
+   *
+   * scala&gt; for (i &lt;- 1 to 9 ) {
+   * |   if (total &gt;= taskAmount) {
+   * |           total -= taskAmount
+   * |           println(s"assign $taskAmount for task $i, total left: $total")
+   * |   } else {
+   * |           println(s"ERROR Can't assign $taskAmount for task $i, total 
left: $total")
+   * |   }
+   * | }
+   * assign 0.1111111111111111 for task 1, total left: 0.8888888888888888
+   * assign 0.1111111111111111 for task 2, total left: 0.7777777777777777
+   * assign 0.1111111111111111 for task 3, total left: 0.6666666666666665
+   * assign 0.1111111111111111 for task 4, total left: 0.5555555555555554
+   * assign 0.1111111111111111 for task 5, total left: 0.44444444444444425
+   * assign 0.1111111111111111 for task 6, total left: 0.33333333333333315
+   * assign 0.1111111111111111 for task 7, total left: 0.22222222222222204
+   * assign 0.1111111111111111 for task 8, total left: 0.11111111111111094
+   * ERROR Can't assign 0.1111111111111111 for task 9, total left: 
0.11111111111111094
+   *
+   * So we multiply ONE_ENTIRE_RESOURCE to convert the double to long to avoid 
this limitation.
+   * Double can display up to 16 decimal places, so we set the factor to
+   * 10, 000, 000, 000, 000, 000L.
+   */
+  final val ONE_ENTIRE_RESOURCE: Long = 10000000000000000L
+
+  def isOneEntireResource(amount: Long): Boolean = amount == 
ONE_ENTIRE_RESOURCE
+
+  def toInternalResource(amount: Double): Long = (amount * 
ONE_ENTIRE_RESOURCE).toLong
+
+  def toFractionalResource(amount: Long): Double = amount.toDouble / 
ONE_ENTIRE_RESOURCE
+
+}
 
 /**
  * Trait used to help executor/worker allocate resources.
@@ -29,59 +72,53 @@ private[spark] trait ResourceAllocator {
 
   protected def resourceName: String
   protected def resourceAddresses: Seq[String]
-  protected def slotsPerAddress: Int
 
   /**
-   * Map from an address to its availability, a value > 0 means the address is 
available,
-   * while value of 0 means the address is fully assigned.
-   *
-   * For task resources ([[org.apache.spark.scheduler.ExecutorResourceInfo]]), 
this value
-   * can be a multiple, such that each address can be allocated up to 
[[slotsPerAddress]]
-   * times.
+   * Map from an address to its availability default to 1.0 (we multiply 
ONE_ENTIRE_RESOURCE
+   * to avoid precision error), a value &gt; 0 means the address is available, 
while value of
+   * 0 means the address is fully assigned.
    */
   private lazy val addressAvailabilityMap = {
-    mutable.HashMap(resourceAddresses.map(_ -> slotsPerAddress): _*)
+    mutable.HashMap(resourceAddresses.map(address => address -> 
ONE_ENTIRE_RESOURCE): _*)
   }
 
   /**
-   * Sequence of currently available resource addresses.
-   *
-   * With [[slotsPerAddress]] greater than 1, [[availableAddrs]] can contain 
duplicate addresses
-   * e.g. with [[slotsPerAddress]] == 2, availableAddrs for addresses 0 and 1 
can look like
-   * Seq("0", "0", "1"), where address 0 has two assignments available, and 1 
has one.
+   * Get the amounts of resources that have been multiplied by 
ONE_ENTIRE_RESOURCE.
+   * @return the resources amounts
+   */
+  def resourcesAmounts: Map[String, Long] = addressAvailabilityMap.toMap
+
+  /**
+   * Sequence of currently available resource addresses which are not fully 
assigned.
    */
   def availableAddrs: Seq[String] = addressAvailabilityMap
-    .flatMap { case (addr, available) =>
-      (0 until available).map(_ => addr)
-    }.toSeq.sorted
+    .filter(addresses => addresses._2 > 0).keys.toSeq.sorted
 
   /**
    * Sequence of currently assigned resource addresses.
-   *
-   * With [[slotsPerAddress]] greater than 1, [[assignedAddrs]] can contain 
duplicate addresses
-   * e.g. with [[slotsPerAddress]] == 2, assignedAddrs for addresses 0 and 1 
can look like
-   * Seq("0", "1", "1"), where address 0 was assigned once, and 1 was assigned 
twice.
    */
   private[spark] def assignedAddrs: Seq[String] = addressAvailabilityMap
-    .flatMap { case (addr, available) =>
-      (0 until slotsPerAddress - available).map(_ => addr)
-    }.toSeq.sorted
+    .filter(addresses => addresses._2 < ONE_ENTIRE_RESOURCE).keys.toSeq.sorted
 
   /**
    * Acquire a sequence of resource addresses (to a launched task), these 
addresses must be
    * available. When the task finishes, it will return the acquired resource 
addresses.
    * Throw an Exception if an address is not available or doesn't exist.
    */
-  def acquire(addrs: Seq[String]): Unit = {
-    addrs.foreach { address =>
-      val isAvailable = addressAvailabilityMap.getOrElse(address,
+  def acquire(addressesAmounts: Map[String, Long]): Unit = {
+    addressesAmounts.foreach { case (address, amount) =>
+      val prevAmount = addressAvailabilityMap.getOrElse(address,
         throw new SparkException(s"Try to acquire an address that doesn't 
exist. $resourceName " +
-        s"address $address doesn't exist."))
-      if (isAvailable > 0) {
-        addressAvailabilityMap(address) -= 1
+          s"address $address doesn't exist."))
+
+      val left = prevAmount - amount
+
+      if (left < 0) {
+        throw new SparkException(s"Try to acquire $resourceName address 
$address " +
+          s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}, but 
only " +
+          s"${ResourceAmountUtils.toFractionalResource(prevAmount)} left.")
       } else {
-        throw new SparkException("Try to acquire an address that is not 
available. " +
-          s"$resourceName address $address is not available.")
+        addressAvailabilityMap(address) = left
       }
     }
   }
@@ -91,16 +128,21 @@ private[spark] trait ResourceAllocator {
    * addresses are released when a task has finished.
    * Throw an Exception if an address is not assigned or doesn't exist.
    */
-  def release(addrs: Seq[String]): Unit = {
-    addrs.foreach { address =>
-      val isAvailable = addressAvailabilityMap.getOrElse(address,
+  def release(addressesAmounts: Map[String, Long]): Unit = {
+    addressesAmounts.foreach { case (address, amount) =>
+      val prevAmount = addressAvailabilityMap.getOrElse(address,
         throw new SparkException(s"Try to release an address that doesn't 
exist. $resourceName " +
           s"address $address doesn't exist."))
-      if (isAvailable < slotsPerAddress) {
-        addressAvailabilityMap(address) += 1
+
+      val total = prevAmount + amount
+
+      if (total > ONE_ENTIRE_RESOURCE) {
+        throw new SparkException(s"Try to release $resourceName address 
$address " +
+          s"amount: ${ResourceAmountUtils.toFractionalResource(amount)}. But 
the total amount: " +
+          s"${ResourceAmountUtils.toFractionalResource(total)} " +
+          s"after release should be <= 1")
       } else {
-        throw new SparkException(s"Try to release an address that is not 
assigned. $resourceName " +
-          s"address $address is not assigned.")
+        addressAvailabilityMap(address) = total
       }
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 69c0672562c2..4a55b2f619e6 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -49,6 +49,8 @@ class ResourceProfile(
     val executorResources: Map[String, ExecutorResourceRequest],
     val taskResources: Map[String, TaskResourceRequest]) extends Serializable 
with Logging {
 
+  validate()
+
   // _id is only a var for testing purposes
   private var _id = ResourceProfile.getNextProfileId
   // This is used for any resources that use fractional amounts, the key is 
the resource name
@@ -59,6 +61,19 @@ class ResourceProfile(
   private var _maxTasksPerExecutor: Option[Int] = None
   private var _coresLimitKnown: Boolean = false
 
+  /**
+   * Validate the ResourceProfile
+   */
+  protected def validate(): Unit = {
+    // The task.amount in ResourceProfile falls within the range of 0 to 0.5,
+    // or it's a whole number
+    for ((_, taskReq) <- taskResources) {
+      val taskAmount = taskReq.amount
+      assert(taskAmount <= 0.5 || taskAmount % 1 == 0,
+        s"The task resource amount ${taskAmount} must be either <= 0.5, or a 
whole number.")
+    }
+  }
+
   /**
    * A unique id of this ResourceProfile
    */
@@ -105,12 +120,8 @@ class ResourceProfile(
 
   /*
    * This function takes into account fractional amounts for the task resource 
requirement.
-   * Spark only supports fractional amounts < 1 to basically allow for 
multiple tasks
-   * to use the same resource address.
-   * The way the scheduler handles this is it adds the same address the number 
of slots per
-   * address times and then the amount becomes 1. This way it re-uses the same 
address
-   * the correct number of times. ie task requirement amount=0.25 -> 
addrs["0", "0", "0", "0"]
-   * and scheduler task amount=1. See ResourceAllocator.slotsPerAddress.
+   * Spark only supports fractional amounts &lt; 1 to basically allow for 
multiple tasks
+   * to use the same resource address or a whole number to use the multiple 
whole addresses.
    */
   private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = {
     val taskAmount = taskResources.getOrElse(resource,
@@ -280,6 +291,11 @@ private[spark] class TaskResourceProfile(
     override val taskResources: Map[String, TaskResourceRequest])
   extends ResourceProfile(Map.empty, taskResources) {
 
+  // The task.amount in TaskResourceProfile falls within the range of 0 to 1.0,
+  // or it's a whole number, and it has been checked in the 
TaskResourceRequest.
+  // Therefore, we can safely skip this check.
+  override protected def validate(): Unit = {}
+
   override protected[spark] def getCustomExecutorResources()
       : Map[String, ExecutorResourceRequest] = {
     if (SparkEnv.get == null) {
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index a6f2ac35af7a..8718ce8ea083 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -169,18 +169,17 @@ private[spark] object ResourceUtils extends Logging {
 
   // Used to take a fraction amount from a task resource requirement and split 
into a real
   // integer amount and the number of slots per address. For instance, if the 
amount is 0.5,
-  // the we get (1, 2) back out. This indicates that for each 1 address, it 
has 2 slots per
-  // address, which allows you to put 2 tasks on that address. Note if amount 
is greater
-  // than 1, then the number of slots per address has to be 1. This would 
indicate that a
-  // would have multiple addresses assigned per task. This can be used for 
calculating
-  // the number of tasks per executor -> (executorAmount * numParts) / 
(integer amount).
+  // the we get (1, 2) back out. This indicates that for each 1 address, it 
allows you to
+  // put 2 tasks on that address. Note if amount is greater than 1, then the 
number of
+  // running tasks per address has to be 1. This can be used for calculating
+  // the number of tasks per executor = (executorAmount * numParts) / (integer 
amount).
   // Returns tuple of (integer amount, numParts)
   def calculateAmountAndPartsForFraction(doubleAmount: Double): (Int, Int) = {
-    val parts = if (doubleAmount <= 0.5) {
+    val parts = if (doubleAmount <= 1.0) {
       Math.floor(1.0 / doubleAmount).toInt
     } else if (doubleAmount % 1 != 0) {
       throw new SparkException(
-        s"The resource amount ${doubleAmount} must be either <= 0.5, or a 
whole number.")
+        s"The resource amount ${doubleAmount} must be either <= 1.0, or a 
whole number.")
     } else {
       1
     }
diff --git 
a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala 
b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
index cbd57808213a..9fc0d93373b5 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequest.scala
@@ -28,7 +28,7 @@ import org.apache.spark.annotation.{Evolving, Since}
  *
  * @param resourceName Resource name
  * @param amount Amount requesting as a Double to support fractional resource 
requests.
- *               Valid values are less than or equal to 0.5 or whole numbers. 
This essentially
+ *               Valid values are less than or equal to 1.0 or whole numbers. 
This essentially
  *               lets you configure X number of tasks to run on a single 
resource,
  *               ie amount equals 0.5 translates into 2 tasks per resource 
address.
  */
@@ -37,8 +37,8 @@ import org.apache.spark.annotation.{Evolving, Since}
 class TaskResourceRequest(val resourceName: String, val amount: Double)
   extends Serializable {
 
-  assert(amount <= 0.5 || amount % 1 == 0,
-    s"The resource amount ${amount} must be either <= 0.5, or a whole number.")
+  assert(amount <= 1.0 || amount % 1 == 0,
+    s"The resource amount ${amount} must be either <= 1.0, or a whole number.")
 
   override def equals(obj: Any): Boolean = {
     obj match {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
index 508c6cebd9fe..d9fbd23f3aa4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourceInfo.scala
@@ -25,16 +25,27 @@ import org.apache.spark.resource.{ResourceAllocator, 
ResourceInformation}
  * information.
  * @param name Resource name
  * @param addresses Resource addresses provided by the executor
- * @param numParts Number of ways each resource is subdivided when scheduling 
tasks
  */
 private[spark] class ExecutorResourceInfo(
     name: String,
-    addresses: Seq[String],
-    numParts: Int)
+    addresses: Seq[String])
   extends ResourceInformation(name, addresses.toArray) with ResourceAllocator {
 
   override protected def resourceName = this.name
   override protected def resourceAddresses = this.addresses
-  override protected def slotsPerAddress: Int = numParts
-  def totalAddressAmount: Int = resourceAddresses.length * slotsPerAddress
+
+  /**
+   * Calculate how many parts the executor can offer according to the task 
resource amount
+   * @param taskAmount how many resource amount the task required
+   * @return the total parts
+   */
+  def totalParts(taskAmount: Double): Int = {
+    assert(taskAmount > 0.0)
+    if (taskAmount >= 1.0) {
+      addresses.length / taskAmount.ceil.toInt
+    } else {
+      addresses.length * Math.floor(1.0 / taskAmount).toInt
+    }
+  }
+
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala
new file mode 100644
index 000000000000..a93f2863ac2f
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/ExecutorResourcesAmounts.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.HashMap
+
+import org.apache.spark.SparkException
+import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfile}
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+
+/**
+ * Class to hold information about a series of resources belonging to an 
executor.
+ * A resource could be a GPU, FPGA, etc. And it is used as a temporary
+ * class to calculate the resources amounts when offering resources to
+ * the tasks in the [[TaskSchedulerImpl]]
+ *
+ * One example is GPUs, where the addresses would be the indices of the GPUs
+ *
+ * @param resources The executor available resources and amount. eg,
+ *                  Map("gpu" -> Map("0" -> 
ResourceAmountUtils.toInternalResource(0.2),
+ *                                   "1" -> 
ResourceAmountUtils.toInternalResource(1.0)),
+ *                  "fpga" -> Map("a" -> 
ResourceAmountUtils.toInternalResource(0.3),
+ *                                "b" -> 
ResourceAmountUtils.toInternalResource(0.9))
+ *                  )
+ */
+private[spark] class ExecutorResourcesAmounts(
+    private val resources: Map[String, Map[String, Long]]) extends 
Serializable {
+
+  /**
+   * convert the resources to be mutable HashMap
+   */
+  private val internalResources: Map[String, HashMap[String, Long]] = {
+    resources.map { case (rName, addressAmounts) =>
+      rName -> HashMap(addressAmounts.toSeq: _*)
+    }
+  }
+
+  /**
+   * The total address count of each resource. Eg,
+   * Map("gpu" -> Map("0" -> ResourceAmountUtils.toInternalResource(0.5),
+   *                  "1" -> ResourceAmountUtils.toInternalResource(0.5),
+   *                  "2" -> ResourceAmountUtils.toInternalResource(0.5)),
+   *     "fpga" -> Map("a" -> ResourceAmountUtils.toInternalResource(0.5),
+   *                   "b" -> ResourceAmountUtils.toInternalResource(0.5)))
+   * the resourceAmount will be Map("gpu" -> 3, "fpga" -> 2)
+   */
+  lazy val resourceAddressAmount: Map[String, Int] = internalResources.map {
+    case (rName, addressMap) => rName -> addressMap.size
+  }
+
+  /**
+   * For testing purpose. convert internal resources back to the "fraction" 
resources.
+   */
+  private[spark] def availableResources: Map[String, Map[String, Double]] = {
+    internalResources.map { case (rName, addressMap) =>
+      rName -> addressMap.map { case (address, amount) =>
+        address -> ResourceAmountUtils.toFractionalResource(amount)
+      }.toMap
+    }
+  }
+
+  /**
+   * Acquire the resource.
+   * @param assignedResource the assigned resource information
+   */
+  def acquire(assignedResource: Map[String, Map[String, Long]]): Unit = {
+    assignedResource.foreach { case (rName, taskResAmounts) =>
+      val availableResourceAmounts = internalResources.getOrElse(rName,
+        throw new SparkException(s"Try to acquire an address from $rName that 
doesn't exist"))
+      taskResAmounts.foreach { case (address, amount) =>
+        val prevInternalTotalAmount = 
availableResourceAmounts.getOrElse(address,
+          throw new SparkException(s"Try to acquire an address that doesn't 
exist. $rName " +
+            s"address $address doesn't exist."))
+
+        val left = prevInternalTotalAmount - amount
+        if (left < 0) {
+          throw new SparkException(s"The total amount " +
+            s"${ResourceAmountUtils.toFractionalResource(left)} " +
+            s"after acquiring $rName address $address should be >= 0")
+        }
+        internalResources(rName)(address) = left
+      }
+    }
+  }
+
+  /**
+   * Release the assigned resources to the resource pool
+   * @param assignedResource resource to be released
+   */
+  def release(assignedResource: Map[String, Map[String, Long]]): Unit = {
+    assignedResource.foreach { case (rName, taskResAmounts) =>
+      val availableResourceAmounts = internalResources.getOrElse(rName,
+        throw new SparkException(s"Try to release an address from $rName that 
doesn't exist"))
+      taskResAmounts.foreach { case (address, amount) =>
+        val prevInternalTotalAmount = 
availableResourceAmounts.getOrElse(address,
+          throw new SparkException(s"Try to release an address that is not 
assigned. $rName " +
+            s"address $address is not assigned."))
+        val total = prevInternalTotalAmount + amount
+        if (total > ONE_ENTIRE_RESOURCE) {
+          throw new SparkException(s"The total amount " +
+            s"${ResourceAmountUtils.toFractionalResource(total)} " +
+            s"after releasing $rName address $address should be <= 1.0")
+        }
+        internalResources(rName)(address) = total
+      }
+    }
+  }
+
+  /**
+   * Try to assign the addresses according to the task requirement. This 
function always goes
+   * through the available resources starting from the "small" address. If the 
resources amount
+   * of the address is matching the task requirement, we will assign this 
address to this task.
+   * Eg, assuming the available resources are {"gpu" -&gt; {"0"-&gt; 0.7, "1" 
-&gt; 1.0}) and the
+   * task requirement is 0.5, this function will return Some(Map("gpu" -&gt; 
{"0" -&gt; 0.5})).
+   *
+   * TODO: as we consistently allocate addresses beginning from the "small" 
address, it can
+   * potentially result in an undesired consequence where a portion of the 
resource is being wasted.
+   * Eg, assuming the available resources are {"gpu" -&gt; {"0"-&gt; 1.0, "1" 
-&gt; 0.5}) and the
+   * task amount requirement is 0.5, this function will return
+   * Some(Map("gpu" -&gt; {"0" -&gt; 0.5})), and the left available resource 
will be
+   * {"gpu" -&gt; {"0"-&gt; 0.5, "1" -&gt; 0.5}) which can't assign to the 
task that
+   * requires &gt; 0.5 any more.
+   *
+   * @param taskSetProf assign resources based on which resource profile
+   * @return the optional assigned resources amounts. returns None if any
+   *         of the task requests for resources aren't met.
+   */
+  def assignAddressesCustomResources(taskSetProf: ResourceProfile):
+      Option[Map[String, Map[String, Long]]] = {
+    // only look at the resource other than cpus
+    val tsResources = taskSetProf.getCustomTaskResources()
+    if (tsResources.isEmpty) {
+      return Some(Map.empty)
+    }
+
+    val allocatedAddresses = HashMap[String, Map[String, Long]]()
+
+    // Go through all resources here so that we can make sure they match and 
also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      // TaskResourceRequest checks the task amount should be in (0, 1] or a 
whole number
+      var taskAmount = taskReqs.amount
+
+      internalResources.get(rName) match {
+        case Some(addressesAmountMap) =>
+          val allocatedAddressesMap = HashMap[String, Long]()
+
+          // Always sort the addresses
+          val addresses = addressesAmountMap.keys.toSeq.sorted
+
+          // task.amount is a whole number
+          if (taskAmount >= 1.0) {
+            for (address <- addresses if taskAmount > 0) {
+              // The address is still a whole resource
+              if 
(ResourceAmountUtils.isOneEntireResource(addressesAmountMap(address))) {
+                taskAmount -= 1.0
+                // Assign the full resource of the address
+                allocatedAddressesMap(address) = ONE_ENTIRE_RESOURCE
+              }
+            }
+          } else if (taskAmount > 0.0) { // 0 < task.amount < 1.0
+            val internalTaskAmount = 
ResourceAmountUtils.toInternalResource(taskAmount)
+            for (address <- addresses if taskAmount > 0) {
+              if (addressesAmountMap(address) >= internalTaskAmount) {
+                // Assign the part of the address.
+                allocatedAddressesMap(address) = internalTaskAmount
+                taskAmount = 0
+              }
+            }
+          }
+
+          if (taskAmount == 0 && allocatedAddressesMap.size > 0) {
+            allocatedAddresses.put(rName, allocatedAddressesMap.toMap)
+          } else {
+            return None
+          }
+
+        case None => return None
+      }
+    }
+    Some(allocatedAddresses.toMap)
+  }
+
+}
+
+private[spark] object ExecutorResourcesAmounts {
+
+  /**
+   * Create an empty ExecutorResourcesAmounts
+   */
+  def empty: ExecutorResourcesAmounts = new ExecutorResourcesAmounts(Map.empty)
+
+  /**
+   * Converts executor infos to ExecutorResourcesAmounts
+   */
+  def apply(executorInfos: Map[String, ExecutorResourceInfo]): 
ExecutorResourcesAmounts = {
+    new ExecutorResourcesAmounts(
+      executorInfos.map { case (rName, rInfo) => rName -> 
rInfo.resourcesAmounts }
+    )
+  }
+
+}
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 75032086ead7..df5f32612bea 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -23,11 +23,10 @@ import java.nio.charset.StandardCharsets
 import java.util.Properties
 
 import scala.collection.immutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
+import scala.collection.mutable.{HashMap, Map}
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{JobArtifactSet, JobArtifactState}
-import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, 
Utils}
 
 /**
@@ -57,7 +56,10 @@ private[spark] class TaskDescription(
     val artifacts: JobArtifactSet,
     val properties: Properties,
     val cpus: Int,
-    val resources: immutable.Map[String, ResourceInformation],
+    // resources is the total resources assigned to the task
+    // Eg, Map("gpu" -> Map("0" -> 
ResourceAmountUtils.toInternalResource(0.7))):
+    // assign 0.7 of the gpu address "0" to this task
+    val resources: immutable.Map[String, immutable.Map[String, Long]],
     val serializedTask: ByteBuffer) {
 
   assert(cpus > 0, "CPUs per task should be > 0")
@@ -74,14 +76,16 @@ private[spark] object TaskDescription {
     }
   }
 
-  private def serializeResources(map: immutable.Map[String, 
ResourceInformation],
+  private def serializeResources(map: immutable.Map[String, 
immutable.Map[String, Long]],
       dataOut: DataOutputStream): Unit = {
     dataOut.writeInt(map.size)
-    map.foreach { case (key, value) =>
-      dataOut.writeUTF(key)
-      dataOut.writeUTF(value.name)
-      dataOut.writeInt(value.addresses.length)
-      value.addresses.foreach(dataOut.writeUTF(_))
+    map.foreach { case (rName, addressAmountMap) =>
+      dataOut.writeUTF(rName)
+      dataOut.writeInt(addressAmountMap.size)
+      addressAmountMap.foreach { case (address, amount) =>
+        dataOut.writeUTF(address)
+        dataOut.writeLong(amount)
+      }
     }
   }
 
@@ -172,21 +176,22 @@ private[spark] object TaskDescription {
   }
 
   private def deserializeResources(dataIn: DataInputStream):
-      immutable.Map[String, ResourceInformation] = {
-    val map = new HashMap[String, ResourceInformation]()
+      immutable.Map[String, immutable.Map[String, Long]] = {
+    val map = new HashMap[String, immutable.Map[String, Long]]()
     val mapSize = dataIn.readInt()
     var i = 0
     while (i < mapSize) {
       val resType = dataIn.readUTF()
-      val name = dataIn.readUTF()
-      val numIdentifier = dataIn.readInt()
-      val identifiers = new ArrayBuffer[String](numIdentifier)
+      val addressAmountMap = new HashMap[String, Long]()
+      val addressAmountSize = dataIn.readInt()
       var j = 0
-      while (j < numIdentifier) {
-        identifiers += dataIn.readUTF()
+      while (j < addressAmountSize) {
+        val address = dataIn.readUTF()
+        val amount = dataIn.readLong()
+        addressAmountMap(address) = amount
         j += 1
       }
-      map(resType) = new ResourceInformation(name, identifiers.toArray)
+      map.put(resType, addressAmountMap.toMap)
       i += 1
     }
     map.toMap
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 454e7ed3ce61..21f62097a4bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.AtomicLong
 
 import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.util.Random
 
 import com.google.common.cache.CacheBuilder
@@ -35,7 +35,7 @@ import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.RpcEndpoint
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
@@ -389,7 +389,7 @@ private[spark] class TaskSchedulerImpl(
       maxLocality: TaskLocality,
       shuffledOffers: Seq[WorkerOffer],
       availableCpus: Array[Int],
-      availableResources: Array[Map[String, Buffer[String]]],
+      availableResources: Array[ExecutorResourcesAmounts],
       tasks: IndexedSeq[ArrayBuffer[TaskDescription]])
     : (Boolean, Option[TaskLocality]) = {
     var noDelayScheduleRejects = true
@@ -429,13 +429,7 @@ private[spark] class TaskSchedulerImpl(
               minLaunchedLocality = minTaskLocality(minLaunchedLocality, 
Some(locality))
               availableCpus(i) -= taskCpus
               assert(availableCpus(i) >= 0)
-              resources.foreach { case (rName, rInfo) =>
-                // Remove the first n elements from availableResources 
addresses, these removed
-                // addresses are the same as that we allocated in 
taskResourceAssignments since it's
-                // synchronized. We don't remove the exact addresses allocated 
because the current
-                // approach produces the identical result with less time 
complexity.
-                availableResources(i)(rName).remove(0, rInfo.addresses.length)
-              }
+              availableResources(i).acquire(resources)
             }
           } catch {
             case e: TaskNotSerializableException =>
@@ -468,33 +462,14 @@ private[spark] class TaskSchedulerImpl(
   private def resourcesMeetTaskRequirements(
       taskSet: TaskSetManager,
       availCpus: Int,
-      availWorkerResources: Map[String, Buffer[String]]
-      ): Option[Map[String, ResourceInformation]] = {
+      availWorkerResources: ExecutorResourcesAmounts): Option[Map[String, 
Map[String, Long]]] = {
     val rpId = taskSet.taskSet.resourceProfileId
     val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
     val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, 
conf)
     // check if the ResourceProfile has cpus first since that is common case
     if (availCpus < taskCpus) return None
     // only look at the resource other than cpus
-    val tsResources = taskSetProf.getCustomTaskResources()
-    if (tsResources.isEmpty) return Some(Map.empty)
-    val localTaskReqAssign = HashMap[String, ResourceInformation]()
-    // we go through all resources here so that we can make sure they match 
and also get what the
-    // assignments are for the next task
-    for ((rName, taskReqs) <- tsResources) {
-      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
-      availWorkerResources.get(rName) match {
-        case Some(workerRes) =>
-          if (workerRes.size >= taskAmount) {
-            localTaskReqAssign.put(rName, new ResourceInformation(rName,
-              workerRes.take(taskAmount).toArray))
-          } else {
-            return None
-          }
-        case None => return None
-      }
-    }
-    Some(localTaskReqAssign.toMap)
+    availWorkerResources.assignAddressesCustomResources(taskSetProf)
   }
 
   private def minTaskLocality(
@@ -576,13 +551,8 @@ private[spark] class TaskSchedulerImpl(
       // value is -1
       val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
         val rpId = taskSet.taskSet.resourceProfileId
-        val availableResourcesAmount = availableResources.map { resourceMap =>
-          // available addresses already takes into account if there are 
fractional
-          // task resource requests
-          resourceMap.map { case (name, addresses) => (name, addresses.length) 
}
-        }
-        calculateAvailableSlots(this, conf, rpId, resourceProfileIds, 
availableCpus,
-          availableResourcesAmount)
+        val resAmounts = availableResources.map(_.resourceAddressAmount)
+        calculateAvailableSlots(this, conf, rpId, resourceProfileIds, 
availableCpus, resAmounts)
       } else {
         -1
       }
@@ -715,9 +685,8 @@ private[spark] class TaskSchedulerImpl(
               barrierPendingLaunchTasks.foreach { task =>
                 // revert all assigned resources
                 availableCpus(task.assignedOfferIndex) += task.assignedCores
-                task.assignedResources.foreach { case (rName, rInfo) =>
-                  
availableResources(task.assignedOfferIndex)(rName).appendAll(rInfo.addresses)
-                }
+                availableResources(task.assignedOfferIndex).release(
+                  task.assignedResources)
                 // re-add the task to the schedule pending list
                 taskSet.addPendingTask(task.index)
               }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index d17e6735c4ec..e15ba28eeda0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -21,7 +21,6 @@ import java.io.NotSerializableException
 import java.nio.ByteBuffer
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue, 
TimeUnit}
 
-import scala.collection.immutable.Map
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.math.max
 import scala.util.control.NonFatal
@@ -33,7 +32,6 @@ import org.apache.spark.TaskState.TaskState
 import org.apache.spark.errors.SparkCoreErrors
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
-import org.apache.spark.resource.ResourceInformation
 import org.apache.spark.scheduler.SchedulingMode._
 import org.apache.spark.util.{AccumulatorV2, Clock, LongAccumulator, 
SystemClock, Utils}
 import org.apache.spark.util.collection.PercentileHeap
@@ -444,7 +442,7 @@ private[spark] class TaskSetManager(
       host: String,
       maxLocality: TaskLocality.TaskLocality,
       taskCpus: Int = sched.CPUS_PER_TASK,
-      taskResourceAssignments: Map[String, ResourceInformation] = Map.empty)
+      taskResourceAssignments: Map[String, Map[String, Long]] = Map.empty)
     : (Option[TaskDescription], Boolean, Int) =
   {
     val offerExcluded = taskSetExcludelistHelperOpt.exists { excludeList =>
@@ -512,7 +510,7 @@ private[spark] class TaskSetManager(
       taskLocality: TaskLocality.Value,
       speculative: Boolean,
       taskCpus: Int,
-      taskResourceAssignments: Map[String, ResourceInformation],
+      taskResourceAssignments: Map[String, Map[String, Long]],
       launchTime: Long): TaskDescription = {
     // Found a task; do some bookkeeping and return a task description
     val task = tasks(index)
@@ -1381,7 +1379,7 @@ private[scheduler] case class BarrierPendingLaunchTask(
     host: String,
     index: Int,
     taskLocality: TaskLocality.TaskLocality,
-    assignedResources: Map[String, ResourceInformation]) {
+    assignedResources: Map[String, Map[String, Long]]) {
   // Stored the corresponding index of the WorkerOffer which is responsible to 
launch the task.
   // Used to revert the assigned resources (e.g., cores, custome resources) 
when the barrier
   // task set doesn't launch successfully in a single resourceOffers round.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala 
b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
index 92a12f13576c..a7d63a8949e1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.scheduler
 
-import scala.collection.mutable.Buffer
-
 import org.apache.spark.resource.ResourceProfile
 
 /**
@@ -32,5 +30,5 @@ case class WorkerOffer(
     // `address` is an optional hostPort string, it provide more useful 
information than `host`
     // when multiple executors are launched on the same host.
     address: Option[String] = None,
-    resources: Map[String, Buffer[String]] = Map.empty,
+    resources: ExecutorResourcesAmounts = ExecutorResourcesAmounts.empty,
     resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index b49f5269169d..1f452ae7d109 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -80,7 +80,7 @@ private[spark] object CoarseGrainedClusterMessages {
       state: TaskState,
       data: SerializableBuffer,
       taskCpus: Int,
-      resources: Map[String, ResourceInformation] = Map.empty)
+      resources: Map[String, Map[String, Long]] = Map.empty)
     extends CoarseGrainedClusterMessage
 
   object StatusUpdate {
@@ -91,7 +91,7 @@ private[spark] object CoarseGrainedClusterMessages {
         state: TaskState,
         data: ByteBuffer,
         taskCpus: Int,
-        resources: Map[String, ResourceInformation]): StatusUpdate = {
+        resources: Map[String, Map[String, Long]]): StatusUpdate = {
       StatusUpdate(executorId, taskId, state, new SerializableBuffer(data), 
taskCpus, resources)
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 831fbd45edd7..7e124302c726 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -172,9 +172,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           executorDataMap.get(executorId) match {
             case Some(executorInfo) =>
               executorInfo.freeCores += taskCpus
-              resources.foreach { case (k, v) =>
-                executorInfo.resourcesInfo.get(k).foreach { r =>
-                  r.release(v.addresses.toImmutableArraySeq)
+              resources.foreach { case (rName, addressAmount) =>
+                executorInfo.resourcesInfo.get(rName).foreach { r =>
+                  r.release(addressAmount)
                 }
               }
               makeOffers(executorId)
@@ -271,12 +271,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
           val resourcesInfo = resources.map { case (rName, info) =>
-            // tell the executor it can schedule resources up to 
numSlotsPerAddress times,
-            // as configured by the user, or set to 1 as that is the default 
(1 task/resource)
-            val numParts = scheduler.sc.resourceProfileManager
-              
.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
-            (info.name,
-              new ExecutorResourceInfo(info.name, 
info.addresses.toImmutableArraySeq, numParts))
+            (info.name, new ExecutorResourceInfo(info.name, 
info.addresses.toIndexedSeq))
           }
           // If we've requested the executor figure out when we did.
           val reqTs: Option[Long] = 
CoarseGrainedSchedulerBackend.this.synchronized {
@@ -385,9 +380,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     private def buildWorkerOffer(executorId: String, executorData: 
ExecutorData) = {
-      val resources = executorData.resourcesInfo.map { case (rName, rInfo) =>
-        (rName, rInfo.availableAddrs.toBuffer)
-      }
+      val resources = ExecutorResourcesAmounts(executorData.resourcesInfo)
       WorkerOffer(
         executorId,
         executorData.executorHost,
@@ -446,11 +439,9 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           // Do resources allocation here. The allocated resources will get 
released after the task
           // finishes.
           executorData.freeCores -= task.cpus
-          task.resources.foreach { case (rName, rInfo) =>
-            assert(executorData.resourcesInfo.contains(rName))
-            
executorData.resourcesInfo(rName).acquire(rInfo.addresses.toImmutableArraySeq)
+          task.resources.foreach { case (rName, addressAmounts) =>
+            executorData.resourcesInfo(rName).acquire(addressAmounts)
           }
-
           logDebug(s"Launching task ${task.taskId} on executor id: 
${task.executorId} hostname: " +
             s"${executorData.executorHost}.")
 
@@ -766,7 +757,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           (
             executor.resourceProfileId,
             executor.totalCores,
-            executor.resourcesInfo.map { case (name, rInfo) => (name, 
rInfo.totalAddressAmount) }
+            executor.resourcesInfo.map { case (name, rInfo) =>
+              val taskAmount = rp.taskResources.get(name).get.amount
+              (name, rInfo.totalParts(taskAmount))
+            }
           )
         }.unzip3
     }
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index 5f7958167507..8eb8d9c6dc85 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -465,7 +465,7 @@ class SparkConfSuite extends SparkFunSuite with 
LocalSparkContext with ResetSyst
       case (ratio, slots) =>
         val conf = new SparkConf()
         conf.set(TASK_GPU_ID.amountConf, ratio.toString)
-        if (ratio > 0.5 && ratio % 1 != 0) {
+        if (ratio > 1.0 && ratio % 1 != 0) {
           assertThrows[SparkException] {
             parseResourceRequirements(conf, SPARK_TASK_PREFIX)
           }
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 45c27aea6022..a8c9550c6b76 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -304,14 +304,15 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
         backend = new CoarseGrainedExecutorBackend(env.rpcEnv, 
rpcEnv.address.hostPort, "1",
         "host1", "host1", 4, env, None,
           resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
-      assert(backend.taskResources.isEmpty)
 
       val taskId = 1000000L
+      val resourcesAmounts = Map(GPU -> Map(
+        "0" -> ResourceAmountUtils.toInternalResource(0.15),
+        "1" -> ResourceAmountUtils.toInternalResource(0.76)))
       // We don't really verify the data, just pass it around.
       val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
       val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 
1000000", 19,
-        1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1,
-        Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
+        1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, 
resourcesAmounts, data)
       val serializedTaskDescription = TaskDescription.encode(taskDescription)
       backend.rpcEnv.setupEndpoint("Executor 1", backend)
       backend.executor = mock[Executor](CALLS_REAL_METHODS)
@@ -342,21 +343,22 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       // Launch a new task shall add an entry to `taskResources` map.
       backend.self.send(LaunchTask(new 
SerializableBuffer(serializedTaskDescription)))
       eventually(timeout(10.seconds)) {
-        assert(backend.taskResources.size == 1)
         assert(runningTasks.size == 1)
-        val resources = backend.taskResources.get(taskId)
-        assert(resources(GPU).addresses sameElements Array("0", "1"))
+        val resources = 
backend.executor.runningTasks.get(taskId).taskDescription.resources
+        assert(resources(GPU).keys.toArray.sorted sameElements Array("0", "1"))
+        assert(executor.runningTasks.get(taskId).taskDescription.resources
+          === resourcesAmounts)
       }
 
       // Update the status of a running task shall not affect `taskResources` 
map.
       backend.statusUpdate(taskId, TaskState.RUNNING, data)
-      assert(backend.taskResources.size == 1)
-      val resources = backend.taskResources.get(taskId)
-      assert(resources(GPU).addresses sameElements Array("0", "1"))
+      val resources = 
backend.executor.runningTasks.get(taskId).taskDescription.resources
+      assert(resources(GPU).keys.toArray.sorted sameElements Array("0", "1"))
+      assert(executor.runningTasks.get(taskId).taskDescription.resources
+        === resourcesAmounts)
 
       // Update the status of a finished task shall remove the entry from 
`taskResources` map.
       backend.statusUpdate(taskId, TaskState.FINISHED, data)
-      assert(backend.taskResources.isEmpty)
     } finally {
       if (backend != null) {
         backend.rpcEnv.shutdown()
@@ -424,11 +426,14 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       val tasksKilled = new TrieMap[Long, Boolean]()
       val tasksExecuted = new TrieMap[Long, Boolean]()
 
+      val resourcesAmounts = Map(GPU -> Map(
+        "0" -> ResourceAmountUtils.toInternalResource(0.15),
+        "1" -> ResourceAmountUtils.toInternalResource(0.76)))
+
       // Fake tasks with different taskIds.
       val taskDescriptions = (1 to numTasks).map {
         taskId => new TaskDescription(taskId, 2, "1", s"TASK $taskId", 19,
-          1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1,
-          Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
+          1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, 
resourcesAmounts, data)
       }
       assert(taskDescriptions.length == numTasks)
 
@@ -513,11 +518,14 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       val tasksKilled = new TrieMap[Long, Boolean]()
       val tasksExecuted = new TrieMap[Long, Boolean]()
 
+      val resourcesAmounts = Map(GPU -> Map(
+        "0" -> ResourceAmountUtils.toInternalResource(0.15),
+        "1" -> ResourceAmountUtils.toInternalResource(0.76)))
+
       // Fake tasks with different taskIds.
       val taskDescriptions = (1 to numTasks).map {
         taskId => new TaskDescription(taskId, 2, "1", s"TASK $taskId", 19,
-          1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1,
-          Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
+          1, JobArtifactSet.emptyJobArtifactSet, new Properties, 1, 
resourcesAmounts, data)
       }
       assert(taskDescriptions.length == numTasks)
 
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 49e19dd2a00e..805e7ca46749 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -642,7 +642,7 @@ class ExecutorSuite extends SparkFunSuite
       JobArtifactSet.emptyJobArtifactSet,
       properties = new Properties,
       cpus = 1,
-      resources = immutable.Map[String, ResourceInformation](),
+      resources = Map.empty,
       serializedTask)
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index be38315cd75f..8f68fd547fb3 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -357,12 +357,11 @@ class ResourceProfileSuite extends SparkFunSuite with 
MockitoSugar {
     var taskError = intercept[AssertionError] {
       rprof.require(new TaskResourceRequests().resource("gpu", 1.5))
     }.getMessage()
-    assert(taskError.contains("The resource amount 1.5 must be either <= 0.5, 
or a whole number."))
+    assert(taskError.contains("The resource amount 1.5 must be either <= 1.0, 
or a whole number."))
 
-    taskError = intercept[AssertionError] {
-      rprof.require(new TaskResourceRequests().resource("gpu", 0.7))
-    }.getMessage()
-    assert(taskError.contains("The resource amount 0.7 must be either <= 0.5, 
or a whole number."))
+    rprof.require(new TaskResourceRequests().resource("gpu", 0.7))
+    rprof.require(new TaskResourceRequests().resource("gpu", 1.0))
+    rprof.require(new TaskResourceRequests().resource("gpu", 2.0))
   }
 
   test("ResourceProfile has correct custom executor resources") {
@@ -393,6 +392,33 @@ class ResourceProfileSuite extends SparkFunSuite with 
MockitoSugar {
       "Task resources should have 1 custom resource")
   }
 
+  test("SPARK-45527 fractional TaskResourceRequests in ResourceProfile") {
+    val ereqs = new ExecutorResourceRequests().cores(6).resource("gpus", 6)
+    var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1)
+    new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+
+    treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.5)
+    new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+
+    treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.7)
+
+    val msg = intercept[AssertionError] {
+      new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+    }.getMessage
+    assert(msg.contains("The task resource amount 0.7 must be either <= 0.5, 
or a whole number"))
+  }
+
+  test("SPARK-45527 fractional TaskResourceRequests in TaskResourceProfile") {
+    var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1)
+    new ResourceProfileBuilder().require(treqs).build()
+
+    treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.5)
+    new ResourceProfileBuilder().require(treqs).build()
+
+    treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.7)
+    new ResourceProfileBuilder().require(treqs).build()
+  }
+
   private def withMockSparkEnv(conf: SparkConf)(f: => Unit): Unit = {
     val previousEnv = SparkEnv.get
     val mockEnv = mock[SparkEnv]
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
index 20d6cc767158..7fd8a11463d8 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala
@@ -26,6 +26,7 @@ import org.json4s.{DefaultFormats, Extraction}
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkException, 
SparkFunSuite}
 import org.apache.spark.TestUtils._
 import org.apache.spark.internal.config._
+import org.apache.spark.internal.config.Tests.RESOURCES_WARNING_TESTING
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.util.Utils
@@ -336,4 +337,250 @@ class ResourceUtilsSuite extends SparkFunSuite
     assert(error.contains("User is expecting to use resource: gpu, but " +
       "didn't specify a discovery script!"))
   }
+
+  test("SPARK-45527 warnOnWastedResources for ResourceProfile") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.cores", "10")
+    conf.set("spark.task.cpus", "1")
+    conf.set(RESOURCES_WARNING_TESTING, true)
+
+    // cpu limiting task number = 10/1, gpu limiting task number = 1/0.1
+    var ereqs = new ExecutorResourceRequests().cores(10).resource("gpu", 1)
+    var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1)
+    var rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+    // no exception,
+    warnOnWastedResources(rp, conf)
+
+    ereqs = new ExecutorResourceRequests().cores(10).resource("gpu", 1)
+    treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2)
+    rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+    // no exception,
+    warnOnWastedResources(rp, conf)
+
+    ereqs = new ExecutorResourceRequests().cores(20).resource("gpu", 2)
+    treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2)
+    rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+    // no exception,
+    warnOnWastedResources(rp, conf)
+
+    var msg: String = ""
+
+    // Test cpu limiting task number
+    // format: (executor.core, task.cpus, executor.gpu, task.gpu, expected 
runnable tasks)
+    Seq(
+      (10, 2, 1, 0.1, 5), // cpu limiting task number=10/2=5, gpu limiting 
task number = 1/0.1 = 10
+      (10, 3, 1, 0.1, 3), // cpu limiting task number=10/3=3, gpu limiting 
task number = 1/0.1 = 10
+      (10, 4, 1, 0.1, 2), // cpu limiting task number=10/4=2, gpu limiting 
task number = 1/0.1 = 10
+      (10, 5, 1, 0.1, 2), // cpu limiting task number=10/5=2, gpu limiting 
task number = 1/0.1 = 10
+      (10, 6, 1, 0.1, 1), // cpu limiting task number=10/6=1, gpu limiting 
task number = 1/0.1 = 10
+      (10, 10, 1, 0.1, 1), // cpu limiting task number=10/6=1, gpu limiting 
task number = 1/0.1 = 10
+      (20, 7, 1, 0.1, 2), // cpu limiting task number=20/7=3, gpu limiting 
task number = 1/0.1 = 10
+      (30, 7, 1, 0.1, 4), // cpu limiting task number=30/7=4, gpu limiting 
task number = 1/0.1 = 10
+      (50, 14, 1, 0.1, 3) // cpu limiting task number=50/14=3, gpu limiting 
task number = 1/0.1=10
+    ).foreach { case (executorCores, taskCpus: Int, executorGpus: Int, 
taskGpus: Double,
+    expectedTaskNumber: Int) =>
+      ereqs = new 
ExecutorResourceRequests().cores(executorCores).resource("gpu", executorGpus)
+      treqs = new TaskResourceRequests().cpus(taskCpus).resource("gpu", 
taskGpus)
+      rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+      msg = intercept[SparkException] {
+        warnOnWastedResources(rp, conf)
+      }.getMessage
+      assert(msg.contains("The configuration of resource: gpu (exec = 1, task 
= 0.1/10, runnable " +
+        "tasks = 10) will result in wasted resources due to resource cpus 
limiting the number of " +
+        s"runnable tasks per executor to: ${expectedTaskNumber}. Please adjust 
your configuration.")
+      )
+    }
+
+    // Test gpu limiting task number
+    // format: (executor.core, task.cpus, executor.gpu, task.gpu, expected 
runnable tasks)
+    Seq(
+      (10, 1, 1, 1.0/9, 9), // cpu limiting task number=10, gpu limiting task 
number = 9
+      (10, 1, 1, 1.0/8, 8), // cpu limiting task number=10, gpu limiting task 
number = 8
+      (10, 1, 1, 1.0/7, 7), // cpu limiting task number=10, gpu limiting task 
number = 7
+      (10, 1, 1, 1.0/6, 6), // cpu limiting task number=10, gpu limiting task 
number = 6
+      (10, 1, 1, 1.0/5, 5), // cpu limiting task number=10, gpu limiting task 
number = 5
+      (10, 1, 1, 1.0/4, 4), // cpu limiting task number=10, gpu limiting task 
number = 4
+      (10, 1, 1, 1.0/3, 3), // cpu limiting task number=10, gpu limiting task 
number = 3
+      (10, 1, 1, 1.0/2, 2), // cpu limiting task number=10, gpu limiting task 
number = 2
+      (10, 1, 1, 1.0, 1), // cpu limiting task number=10, gpu limiting task 
number = 1
+      (30, 1, 2, 1.0/9, 2*9), // cpu limiting task number=30, gpu limiting 
task number = 2*9
+      (30, 1, 2, 1.0/8, 2*8), // cpu limiting task number=30, gpu limiting 
task number = 2*8
+      (30, 1, 2, 1.0/7, 2*7), // cpu limiting task number=30, gpu limiting 
task number = 2*7
+      (30, 1, 2, 1.0/6, 2*6), // cpu limiting task number=30, gpu limiting 
task number = 2*6
+      (30, 1, 2, 1.0/5, 2*5), // cpu limiting task number=30, gpu limiting 
task number = 2*5
+      (30, 1, 2, 1.0/4, 2*4), // cpu limiting task number=30, gpu limiting 
task number = 2*4
+      (30, 1, 2, 1.0/3, 2*3), // cpu limiting task number=30, gpu limiting 
task number = 2*3
+      (30, 1, 2, 1.0/2, 2*2), // cpu limiting task number=30, gpu limiting 
task number = 2*2
+      (30, 1, 2, 1.0, 2*1), // cpu limiting task number=30, gpu limiting task 
number = 2*1
+      (30, 1, 2, 2.0, 1), // cpu limiting task number=30, gpu limiting task 
number = 1
+      (70, 2, 7, 0.5, 7*2), // cpu limiting task number=30, gpu limiting task 
number = 7*1/0.5=
+      (80, 3, 9, 2.0, 9/2) // cpu limiting task number=30, gpu limiting task 
number = 9/2
+    ).foreach { case (executorCores, taskCpus: Int, executorGpus: Int, 
taskGpus: Double,
+    expectedTaskNumber: Int) =>
+      ereqs = new 
ExecutorResourceRequests().cores(executorCores).resource("gpu", executorGpus)
+      treqs = new TaskResourceRequests().cpus(taskCpus).resource("gpu", 
taskGpus)
+      rp = new ResourceProfileBuilder().require(ereqs).require(treqs).build()
+      msg = intercept[SparkException] {
+        warnOnWastedResources(rp, conf)
+      }.getMessage
+      assert(msg.contains(s"The configuration of cores (exec = 
${executorCores} task = " +
+        s"${taskCpus}, runnable tasks = ${executorCores/taskCpus}) " +
+        "will result in wasted resources due to resource gpu limiting the 
number of runnable " +
+        s"tasks per executor to: ${expectedTaskNumber}. Please adjust your 
configuration"))
+    }
+  }
+
+  private class FakedTaskResourceProfile(
+      val defaultRp: ResourceProfile,
+      override val taskResources: Map[String, TaskResourceRequest])
+    extends TaskResourceProfile(taskResources) {
+    override protected[spark] def getCustomExecutorResources()
+      : Map[String, ExecutorResourceRequest] = 
defaultRp.getCustomExecutorResources()
+  }
+
+  test("SPARK-45527 warnOnWastedResources for TaskResourceProfile when 
executor number = 1") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.cores", "10")
+    conf.set("spark.task.cpus", "1")
+    conf.set(TASK_GPU_ID.amountConf, "0.1")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "1")
+    conf.set(RESOURCES_WARNING_TESTING, true)
+
+    val defaultDp = ResourceProfile.getOrCreateDefaultProfile(conf)
+
+    // cpu limiting task number = 10/1, gpu limiting task number = 1/0.1
+    var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1)
+    var rp = new FakedTaskResourceProfile(defaultDp, treqs.requests)
+    // no exception,
+    warnOnWastedResources(rp, conf)
+
+    var msg: String = ""
+
+    // Test cpu limiting task number
+    // format: (task cpu cores, task gpu amount, expected runnable tasks)
+    // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, 
TASK_GPU_ID=0.1
+    Seq(
+      (2, 0.1, 5), // cpu limiting task number = 10/2=5, gpu limiting task 
number = 1/0.1 = 10
+      (3, 0.1, 3), // cpu limiting task number = 10/3 = 3, gpu limiting task 
number = 1/0.1 = 10
+      (4, 0.1, 2), // cpu limiting task number = 10/4 = 2, gpu limiting task 
number = 1/0.1 = 10
+      (5, 0.1, 2), // cpu limiting task number = 10/5 = 2, gpu limiting task 
number = 1/0.1 = 10
+      (6, 0.1, 1), // cpu limiting task number = 10/6 = 1, gpu limiting task 
number = 1/0.1 = 10
+      (7, 0.1, 1), // cpu limiting task number = 10/7 = 1, gpu limiting task 
number = 1/0.1 = 10
+      (10, 0.1, 1) // cpu limiting task number = 10/10 = 1, gpu limiting task 
number = 1/0.1 = 10
+    ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) =>
+      treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus)
+      rp = new FakedTaskResourceProfile(defaultDp, treqs.requests)
+      msg = intercept[SparkException] {
+        warnOnWastedResources(rp, conf)
+      }.getMessage
+      assert(msg.contains("The configuration of resource: gpu (exec = 1, task 
= 0.1/10, runnable " +
+        "tasks = 10) will result in wasted resources due to resource cpus 
limiting the number of " +
+        s"runnable tasks per executor to: $expectedTaskNumber. Please adjust 
your configuration.")
+      )
+    }
+
+    // Test gpu limiting task number
+    // format: (task cpu cores, task gpu amount, expected runnable tasks)
+    // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, 
TASK_GPU_ID=0.1
+    Seq(
+      (1, 0.111, 9), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.111=9
+      (1, 0.125, 8), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.125=8
+      (1, 0.142, 7), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.142=7
+      (1, 0.166, 6), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.166=6
+      (1, 0.2, 5), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.2=5
+      (1, 0.25, 4), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.25=4
+      (1, 0.333, 3), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.333=3
+      (1, 0.5, 2), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.166=2
+      (1, 0.6, 1), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.6=1
+      (1, 0.7, 1), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.7=1
+      (1, 0.8, 1), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.8=1
+      (1, 0.9, 1), // cpu limiting task number = 10/1, gpu limiting task 
number = 1/0.9=1
+      (1, 1.0, 1) // cpu limiting task number = 10/1, gpu limiting task number 
= 1/1.0=1
+    ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) =>
+      treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus)
+      rp = new FakedTaskResourceProfile(defaultDp, treqs.requests)
+      msg = intercept[SparkException] {
+        warnOnWastedResources(rp, conf)
+      }.getMessage
+      assert(msg.contains("The configuration of cores (exec = 10 task = 1, 
runnable tasks = 10) " +
+        "will result in wasted resources due to resource gpu limiting the 
number of runnable " +
+        s"tasks per executor to: $expectedTaskNumber. Please adjust your 
configuration"))
+    }
+  }
+
+  test("SPARK-45527 warnOnWastedResources for TaskResourceProfile when 
executor number > 1") {
+    val conf = new SparkConf()
+    conf.set("spark.executor.cores", "60")
+    conf.set("spark.task.cpus", "1")
+    conf.set(TASK_GPU_ID.amountConf, "0.1")
+    conf.set(EXECUTOR_GPU_ID.amountConf, "6")
+    conf.set(RESOURCES_WARNING_TESTING, true)
+
+    val defaultDp = ResourceProfile.getOrCreateDefaultProfile(conf)
+
+    // cpu limiting task number = 60/1, gpu limiting task number = 6/0.1
+    var treqs = new TaskResourceRequests().cpus(1).resource("gpu", 0.1)
+    var rp = new FakedTaskResourceProfile(defaultDp, treqs.requests)
+    // no exception,
+    warnOnWastedResources(rp, conf)
+
+    // cpu limiting task number = 60/2 = 30, gpu limiting task number = 6/0.2 
= 30
+    treqs = new TaskResourceRequests().cpus(2).resource("gpu", 0.2)
+    rp = new FakedTaskResourceProfile(defaultDp, treqs.requests)
+    // no exception
+    warnOnWastedResources(rp, conf)
+
+    var msg: String = ""
+
+    // Test cpu limiting task number
+    // format: (task cpu cores, task gpu amount, expected runnable tasks)
+    // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, 
TASK_GPU_ID=0.1
+    Seq(
+      (4, 0.2, 15), // cpu limiting task number = 60/4=15, gpu limiting task 
number = 6/0.2 = 30
+      (7, 0.2, 8), // cpu limiting task number = 60/7 = 8, gpu limiting task 
number = 6/0.2 = 30
+      (30, 0.2, 2), // cpu limiting task number = 60/30 = 2, gpu limiting task 
number = 6/0.2 = 30
+      (31, 0.2, 1), // cpu limiting task number = 60/31 = 1, gpu limiting task 
number = 6/0.2 = 30
+      (55, 0.2, 1), // cpu limiting task number = 60/55 = 1, gpu limiting task 
number = 6/0.2 = 30
+      (60, 0.2, 1) // cpu limiting task number = 60/60 = 1, gpu limiting task 
number = 6/0.2 = 30
+    ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) =>
+      treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus)
+      rp = new FakedTaskResourceProfile(defaultDp, treqs.requests)
+      msg = intercept[SparkException] {
+        warnOnWastedResources(rp, conf)
+      }.getMessage
+      assert(msg.contains("The configuration of resource: gpu (exec = 6, task 
= 0.2/5, runnable " +
+        "tasks = 30) will result in wasted resources due to resource cpus 
limiting the number of " +
+        s"runnable tasks per executor to: $expectedTaskNumber. Please adjust 
your configuration.")
+      )
+    }
+
+    // Test gpu limiting task number
+    // format: (task cpu cores, task gpu amount, expected runnable tasks)
+    // spark.executor.cores=60, spark.task.cpus=1, EXECUTOR_GPU_ID=6, 
TASK_GPU_ID=0.1
+    Seq(
+      (1, 0.111, 54), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.111=54
+      (1, 0.125, 48), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.125=48
+      (1, 0.142, 42), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.142=42
+      (1, 0.166, 36), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.166=36
+      (1, 0.2, 30), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.2=30
+      (1, 0.25, 24), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.25=24
+      (1, 0.33, 18), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.33=18
+      (1, 0.5, 12), // cpu limiting task number = 60/1, gpu limiting task 
number = 6*1/0.5=12
+      (1, 0.7, 6), // cpu limiting task number = 60/1 = 60, gpu limiting task 
number = 6*1/0.7 = 6
+      (1, 1.0, 6), // cpu limiting task number = 60/1 = 60, gpu limiting task 
number = 6/1 = 6
+      (1, 2.0, 3),  // cpu limiting task number = 60/1 = 60, gpu limiting task 
number = 6/2 = 3
+      (1, 3.0, 2), // cpu limiting task number = 60/1 = 60, gpu limiting task 
number = 6/3 = 2
+      (1, 4.0, 1), // cpu limiting task number = 60/1 = 60, gpu limiting task 
number = 6/4 = 1
+      (1, 6.0, 1) // cpu limiting task number = 60/1 = 60, gpu limiting task 
number = 6/6 = 1
+    ).foreach { case (cores: Int, gpus: Double, expectedTaskNumber: Int) =>
+      treqs = new TaskResourceRequests().cpus(cores).resource("gpu", gpus)
+      rp = new FakedTaskResourceProfile(defaultDp, treqs.requests)
+      msg = intercept[SparkException] {
+        warnOnWastedResources(rp, conf)
+      }.getMessage
+      assert(msg.contains("The configuration of cores (exec = 60 task = 1, 
runnable tasks = 60) " +
+        "will result in wasted resources due to resource gpu limiting the 
number of runnable " +
+        s"tasks per executor to: $expectedTaskNumber. Please adjust your 
configuration"))
+    }
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index e9b8ae4bffe6..1b444aa60474 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -37,6 +37,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Network.RPC_MESSAGE_MAX_SIZE
 import org.apache.spark.rdd.RDD
 import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceInformation, ResourceProfile, TaskResourceRequests}
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
@@ -254,7 +255,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
     assert(exec3ResourceProfileId === rp.id)
 
-    val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
+    val taskResources = Map(GPU -> Map("0" -> ONE_ENTIRE_RESOURCE))
     val taskCpus = 1
     val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 
0, "1",
       "t1", 0, 1, JobArtifactSet.emptyJobArtifactSet, new Properties(),
@@ -361,7 +362,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
     assert(exec3ResourceProfileId === rp.id)
 
-    val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
+    val taskResources = Map(GPU -> Map("0" -> ONE_ENTIRE_RESOURCE))
     val taskCpus = 1
     val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 
0, "1",
       "t1", 0, 1, JobArtifactSet.emptyJobArtifactSet, new Properties(),
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
index 203e30cac1a7..ac1a6b7c3ec0 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
@@ -18,74 +18,87 @@
 package org.apache.spark.scheduler
 
 import scala.collection.mutable.ArrayBuffer
+import scala.language.implicitConversions
 
 import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.resource.ResourceAmountUtils
 import org.apache.spark.resource.ResourceUtils.GPU
-import org.apache.spark.util.ArrayImplicits._
 
 class ExecutorResourceInfoSuite extends SparkFunSuite {
 
+  implicit def convertMapLongToDouble(resources: Map[String, Long]): 
Map[String, Double] = {
+    resources.map { case (k, v) => k -> 
ResourceAmountUtils.toFractionalResource(v) }
+  }
+
+  implicit def convertMapDoubleToLong(resources: Map[String, Double]): 
Map[String, Long] = {
+    resources.map { case (k, v) => k -> 
ResourceAmountUtils.toInternalResource(v) }
+  }
+
   test("Track Executor Resource information") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
+    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"))
     assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3"))
     assert(info.assignedAddrs.isEmpty)
 
+    val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap
     // Acquire addresses
-    info.acquire(Array("0", "1").toImmutableArraySeq)
+    info.acquire(reqResource)
     assert(info.availableAddrs.sorted sameElements Seq("2", "3"))
     assert(info.assignedAddrs.sorted sameElements Seq("0", "1"))
 
     // release addresses
-    info.release(Array("0", "1").toImmutableArraySeq)
+    info.release(reqResource)
     assert(info.availableAddrs.sorted sameElements Seq("0", "1", "2", "3"))
     assert(info.assignedAddrs.isEmpty)
   }
 
   test("Don't allow acquire address that is not available") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
+    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"))
     // Acquire some addresses.
-    info.acquire(Seq("0", "1"))
+    val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap
+    info.acquire(reqResource)
     assert(!info.availableAddrs.contains("1"))
     // Acquire an address that is not available
     val e = intercept[SparkException] {
-      info.acquire(Array("1").toImmutableArraySeq)
+      info.acquire(convertMapDoubleToLong(Map("1" -> 1.0)))
     }
-    assert(e.getMessage.contains("Try to acquire an address that is not 
available."))
+    assert(e.getMessage.contains("Try to acquire gpu address 1 amount: 1.0, 
but only 0.0 left."))
   }
 
   test("Don't allow acquire address that doesn't exist") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
+    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"))
     assert(!info.availableAddrs.contains("4"))
     // Acquire an address that doesn't exist
     val e = intercept[SparkException] {
-      info.acquire(Array("4").toImmutableArraySeq)
+      info.acquire(convertMapDoubleToLong(Map("4" -> 1.0)))
     }
     assert(e.getMessage.contains("Try to acquire an address that doesn't 
exist."))
   }
 
   test("Don't allow release address that is not assigned") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
+    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"))
     // Acquire addresses
-    info.acquire(Array("0", "1").toImmutableArraySeq)
+    val reqResource = Seq("0", "1").map(addrs => addrs -> 1.0).toMap
+    info.acquire(reqResource)
     assert(!info.assignedAddrs.contains("2"))
     // Release an address that is not assigned
     val e = intercept[SparkException] {
-      info.release(Array("2").toImmutableArraySeq)
+      info.release(convertMapDoubleToLong(Map("2" -> 1.0)))
     }
-    assert(e.getMessage.contains("Try to release an address that is not 
assigned."))
+    assert(e.getMessage.contains("Try to release gpu address 2 amount: 1.0. " +
+      "But the total amount: 2.0 after release should be <= 1"))
   }
 
   test("Don't allow release address that doesn't exist") {
     // Init Executor Resource.
-    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"), 1)
+    val info = new ExecutorResourceInfo(GPU, Seq("0", "1", "2", "3"))
     assert(!info.assignedAddrs.contains("4"))
     // Release an address that doesn't exist
     val e = intercept[SparkException] {
-      info.release(Array("4").toImmutableArraySeq)
+      info.release(convertMapDoubleToLong(Map("4" -> 1.0)))
     }
     assert(e.getMessage.contains("Try to release an address that doesn't 
exist."))
   }
@@ -94,23 +107,92 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
     val slotSeq = Seq(10, 9, 8, 7, 6, 5, 4, 3, 2, 1)
     val addresses = ArrayBuffer("0", "1", "2", "3")
     slotSeq.foreach { slots =>
-      val info = new ExecutorResourceInfo(GPU, addresses.toSeq, slots)
+      val taskAmount = 1.0 / slots
+      val info = new ExecutorResourceInfo(GPU, addresses.toSeq)
       for (_ <- 0 until slots) {
-        addresses.foreach(addr => info.acquire(Seq(addr)))
+        addresses.foreach(addr => info.acquire(convertMapDoubleToLong(Map(addr 
-> taskAmount))))
       }
 
-      // assert that each address was assigned `slots` times
-      info.assignedAddrs
-        .groupBy(identity)
-        .transform((_, v) => v.size)
-        .foreach(x => assert(x._2 == slots))
+      // All addresses has been assigned
+      assert(info.resourcesAmounts.values.toSeq.toSet.size == 1)
+      // The left amount of any address should < taskAmount
+      
assert(ResourceAmountUtils.toFractionalResource(info.resourcesAmounts("0")) < 
taskAmount)
 
       addresses.foreach { addr =>
         assertThrows[SparkException] {
-          info.acquire(Seq(addr))
+          info.acquire(convertMapDoubleToLong(Map(addr -> taskAmount)))
         }
-        assert(!info.availableAddrs.contains(addr))
       }
     }
   }
+
+  def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double],
+                  eps: Double = 0.00000001): Boolean = {
+    lhs.size == rhs.size &&
+      lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) =>
+        lName == rName && (lAmount - rAmount).abs < eps
+      }
+  }
+
+  test("assign/release resource for different task requirements") {
+    val execInfo = new ExecutorResourceInfo("gpu", Seq("0", "1", "2", "3"))
+
+    def testAllocation(taskAddressAmount: Map[String, Double],
+                       expectedLeftRes: Map[String, Double]
+                      ): Unit = {
+      execInfo.acquire(taskAddressAmount)
+      val leftRes = execInfo.resourcesAmounts
+      assert(compareMaps(leftRes, expectedLeftRes))
+    }
+
+    def testRelease(releasedRes: Map[String, Double],
+                    expectedLeftRes: Map[String, Double]
+                   ): Unit = {
+      execInfo.release(releasedRes)
+      val leftRes = execInfo.resourcesAmounts
+      assert(compareMaps(leftRes, expectedLeftRes))
+    }
+
+    testAllocation(taskAddressAmount = Map("0" -> 0.2),
+      expectedLeftRes = Map("0" -> 0.8, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    testAllocation(taskAddressAmount = Map("0" -> 0.2),
+      expectedLeftRes = Map("0" -> 0.6, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    testAllocation(taskAddressAmount = Map("1" -> 1.0, "2" -> 1.0),
+      expectedLeftRes = Map("0" -> 0.6, "1" -> 0.0, "2" -> 0.0, "3" -> 1.0))
+
+    testRelease(releasedRes = Map("0" -> 0.1, "2" -> 0.8),
+      expectedLeftRes = Map("0" -> 0.7, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0))
+
+    testAllocation(taskAddressAmount = Map("0" -> 0.50002),
+      expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 
1.0))
+
+    testAllocation(taskAddressAmount = Map("3" -> 1.0),
+      expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 
0.0))
+
+    testAllocation(taskAddressAmount = Map("2" -> 0.2),
+      expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.6, "3" -> 
0.0))
+
+    testRelease(releasedRes = Map("0" -> 0.80002, "1" -> 1.0, "2" -> 0.4, "3" 
-> 1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    testAllocation(taskAddressAmount = Map("0" -> 1.0),
+      expectedLeftRes = Map("0" -> 0.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    testRelease(releasedRes = Map("0" -> 1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    testAllocation(taskAddressAmount = Map("0" -> 1.0, "1" -> 1.0),
+      expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 1.0, "3" -> 1.0))
+
+    testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    testAllocation(taskAddressAmount = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, 
"3" -> 1.0),
+      expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 0.0, "3" -> 0.0))
+
+    testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 
1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
new file mode 100644
index 000000000000..e512327fefa7
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourcesAmountsSuite.scala
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.collection.mutable.ArrayBuffer
+import scala.language.implicitConversions
+
+import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.resource.{ResourceAmountUtils, ResourceProfileBuilder, 
TaskResourceRequests}
+import org.apache.spark.resource.ResourceUtils.GPU
+
+class ExecutorResourcesAmountsSuite extends SparkFunSuite {
+
+  implicit def toFractionalResource(resources: Map[String, Long]): Map[String, 
Double] =
+    resources.map { case (k, v) => k -> 
ResourceAmountUtils.toFractionalResource(v) }
+
+  implicit def toInternalResource(resources: Map[String, Double]): Map[String, 
Long] =
+    resources.map { case (k, v) => k -> 
ResourceAmountUtils.toInternalResource(v) }
+
+  implicit def toInternalResourceMap(resources: Map[String, Map[String, 
Double]]):
+      Map[String, Map[String, Long]] =
+    resources.map { case (resName, addressesAmountMap) =>
+      resName -> addressesAmountMap.map { case (k, v) =>
+        k -> ResourceAmountUtils.toInternalResource(v) }
+  }
+
+  def compareMaps(lhs: Map[String, Double], rhs: Map[String, Double],
+                  eps: Double = 0.00000001): Boolean = {
+    lhs.size == rhs.size &&
+      lhs.zip(rhs).forall { case ((lName, lAmount), (rName, rAmount)) =>
+        lName == rName && (lAmount - rAmount).abs < eps
+      }
+  }
+
+  test("assign to rp without task resources requirement") {
+    val executorsInfo = Map(
+      "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")),
+      "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb"))
+    )
+    val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo)
+    assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, 
"fpga" -> 2))
+
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp = new ResourceProfileBuilder().require(treqs).build()
+
+    // assign nothing to rp without resource profile
+    val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
+    assert(assigned.isDefined)
+    assigned.foreach { case resource => assert(resource.isEmpty) }
+  }
+
+  test("Convert ExecutorResourceInfos to ExecutorResourcesAmounts") {
+    val executorsInfo = Map(
+      "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")),
+      "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb"))
+    )
+    // default resources amounts of executors info
+    executorsInfo.foreach { case (rName, rInfo) =>
+      if (rName == "gpu") {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, 
"6" -> 1.0)))
+      } else {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 
1.0)))
+      }
+    }
+
+    val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo)
+    assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, 
"fpga" -> 2))
+
+    val availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 
1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0)))
+      }
+    }
+
+    // Update executors info
+    // executors info shouldn't be changed.
+    executorsInfo.foreach { case (rName, rInfo) =>
+      if (rName == "gpu") {
+        rInfo.acquire(toInternalResource(Map("2" -> 0.4, "6" -> 0.6)))
+      } else {
+        rInfo.acquire(toInternalResource(Map("aa" -> 0.2, "bb" -> 0.7)))
+      }
+    }
+
+    executorsInfo.foreach { case (rName, rInfo) =>
+      if (rName == "gpu") {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 0.6, "4" -> 1.0, 
"6" -> 0.4)))
+      } else {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 0.8, "bb" -> 
0.3)))
+      }
+    }
+
+    val availableExecResAmounts1 = ExecutorResourcesAmounts(executorsInfo)
+    assert(availableExecResAmounts1.resourceAddressAmount === Map("gpu" -> 3, 
"fpga" -> 2))
+
+    val availableRes1 = availableExecResAmounts1.availableResources
+    availableRes1.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount, Map("2" -> 0.6, "4" -> 1.0, "6" -> 
0.4)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 0.8, "bb" -> 0.3)))
+      }
+    }
+
+  }
+
+  test("ExecutorResourcesAmounts shouldn't change ExecutorResourceInfo") {
+    val executorsInfo = Map(
+      "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")),
+      "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb"))
+    )
+
+    // default resources amounts of executors info
+    executorsInfo.foreach { case (rName, rInfo) =>
+      if (rName == "gpu") {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, 
"6" -> 1.0)))
+      } else {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 
1.0)))
+      }
+    }
+
+    val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo)
+    assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, 
"fpga" -> 2))
+
+    val gpuTaskAmount = 0.1
+    val treqs = new TaskResourceRequests().resource("gpu", gpuTaskAmount)
+    val rp = new ResourceProfileBuilder().require(treqs).build()
+
+    // taskMount = 0.1 < 1.0 which can be assigned.
+    val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
+    // update the value
+    availableExecResAmounts.acquire(assigned.get)
+
+    val availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount,
+          Map("2" -> (1.0 - gpuTaskAmount), "4" -> 1.0, "6" -> 1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0)))
+      }
+    }
+
+    // executors info shouldn't be changed.
+    executorsInfo.foreach { case (rName, rInfo) =>
+      if (rName == "gpu") {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("2" -> 1.0, "4" -> 1.0, 
"6" -> 1.0)))
+      } else {
+        assert(compareMaps(rInfo.resourcesAmounts, Map("aa" -> 1.0, "bb" -> 
1.0)))
+      }
+    }
+  }
+
+  test("executor resources are not matching to the task requirement") {
+    val totalRes = Map("gpu" -> Map("2" -> 0.4))
+    val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes)
+
+    val gpuTaskAmount = 0.6
+    val treqs = new TaskResourceRequests()
+      .resource("gpu", gpuTaskAmount)
+    val rp = new ResourceProfileBuilder().require(treqs).build()
+
+    val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
+    assert(assigned.isEmpty)
+  }
+
+  test("part of executor resources are not matching to the task requirement") {
+    val totalRes = Map("gpu" -> Map("2" -> 0.4), "fpga" -> Map("aa" -> 0.8))
+    val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes)
+
+    // normal allocation
+    val gpuTaskAmount = 0.3
+    val fpgaTaskAmount = 0.8
+    val treqs = new TaskResourceRequests()
+      .resource("gpu", gpuTaskAmount)
+      .resource("fpga", fpgaTaskAmount)
+    val rp = new ResourceProfileBuilder().require(treqs).build()
+
+    var assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
+    assert(!assigned.isEmpty)
+    assigned.foreach { case resource => assert(!resource.isEmpty)}
+
+    val treqs1 = new TaskResourceRequests()
+      .resource("gpu", gpuTaskAmount)
+      .resource("fpga", 0.9) // couldn't allocate fpga
+    val rp1 = new ResourceProfileBuilder().require(treqs1).build()
+
+    assigned = availableExecResAmounts.assignAddressesCustomResources(rp1)
+    assert(assigned.isEmpty)
+  }
+
+  test("the total amount after release should be <= 1.0") {
+    val totalRes = Map("gpu" -> Map("2" -> 0.4))
+    val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes)
+
+    val e = intercept[SparkException] {
+      availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> 
Map("2" -> 0.7))))
+    }
+    assert(e.getMessage.contains("after releasing gpu address 2 should be <= 
1.0"))
+
+    availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> Map("2" 
-> 0.6))))
+    assert(compareMaps(availableExecResAmounts.availableResources("gpu"), 
Map("2" -> 1.0)))
+  }
+
+  test("the total amount after acquire should be >= 0") {
+    val totalRes = Map("gpu" -> Map("2" -> 0.4))
+    val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes)
+
+    val e = intercept[SparkException] {
+      availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> 
Map("2" -> 0.6))))
+    }
+    assert(e.getMessage.contains("after acquiring gpu address 2 should be >= 
0"))
+
+    availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> Map("2" 
-> 0.4))))
+    assert(compareMaps(availableExecResAmounts.availableResources("gpu"), 
Map("2" -> 0.0)))
+  }
+
+  test("Ensure that we can acquire the same fractions of a resource") {
+    val slotSeq = Seq(31235, 1024, 512, 256, 128, 64, 32, 16, 12, 10, 9, 8, 7, 
6, 5, 4, 3, 2, 1)
+    val addresses = ArrayBuffer("0", "1", "2", "3")
+    val info = new ExecutorResourceInfo(GPU, addresses.toSeq)
+
+    slotSeq.foreach { slots =>
+      val taskAmount = 1.0 / slots
+      val availableExecResAmounts = ExecutorResourcesAmounts(Map(GPU -> info))
+      for (_ <- 0 until slots) {
+        addresses.foreach(addr =>
+          availableExecResAmounts.acquire(
+            toInternalResourceMap(Map(GPU -> Map(addr -> taskAmount)))))
+      }
+
+      assert(availableExecResAmounts.availableResources.size === 1)
+      // All addresses has been assigned
+      
assert(availableExecResAmounts.availableResources(GPU).values.toSeq.toSet.size 
=== 1)
+      // The left amount of any address should < taskAmount
+      assert(availableExecResAmounts.availableResources(GPU)("0") < taskAmount)
+
+      addresses.foreach { addr =>
+        assertThrows[SparkException] {
+          availableExecResAmounts.acquire(
+            toInternalResourceMap(Map(GPU -> Map(addr -> taskAmount))))
+        }
+      }
+    }
+  }
+
+  test("assign acquire release on single task resource request") {
+    val executorsInfo = Map(
+      "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")),
+      "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb"))
+    )
+
+    val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo)
+
+    assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, 
"fpga" -> 2))
+
+    val gpuTaskAmount = 0.1
+    val treqs = new TaskResourceRequests().resource("gpu", gpuTaskAmount)
+    val rp = new ResourceProfileBuilder().require(treqs).build()
+
+    // taskMount = 0.1 < 1.0 which can be assigned.
+    val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
+    assert(!assigned.isEmpty)
+    assigned.foreach { case resource =>
+      assert(resource.size === 1)
+      assert(resource.keys.toSeq === Seq("gpu"))
+      assert(resource("gpu").size === 1)
+      assert(resource("gpu").keys.toSeq === Seq("2"))
+      assert(ResourceAmountUtils.toFractionalResource(resource("gpu")("2")) 
=== gpuTaskAmount)
+    }
+
+    // assign will not update the real value.
+    var availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 
1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0)))
+      }
+    }
+
+    // acquire will updates the value
+    availableExecResAmounts.acquire(assigned.get)
+
+    // after acquire
+    availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount, Map(
+          "2" -> (1.0 - gpuTaskAmount),
+          "4" -> 1.0,
+          "6" -> 1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0)))
+      }
+    }
+
+    // release
+    availableExecResAmounts.release(assigned.get)
+
+    availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 
1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0)))
+      }
+    }
+  }
+
+  test("assign acquire release on multiple task resources request") {
+    val executorsInfo = Map(
+      "gpu" -> new ExecutorResourceInfo("gpu", Seq("2", "4", "6")),
+      "fpga" -> new ExecutorResourceInfo("fpga", Seq("aa", "bb"))
+    )
+
+    val availableExecResAmounts = ExecutorResourcesAmounts(executorsInfo)
+
+    assert(availableExecResAmounts.resourceAddressAmount === Map("gpu" -> 3, 
"fpga" -> 2))
+
+    val gpuTaskAmount = 0.1
+    val fpgaTaskAmount = 0.3
+    val treqs = new TaskResourceRequests()
+      .resource("gpu", gpuTaskAmount)
+      .resource("fpga", fpgaTaskAmount)
+    val rp = new ResourceProfileBuilder().require(treqs).build()
+
+    // taskMount = 0.1 < 1.0 which can be assigned.
+    val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
+    assert(!assigned.isEmpty)
+    assigned.foreach { case resourceAmounts =>
+      assert(resourceAmounts.size === 2)
+      assert(resourceAmounts.keys.toSeq.sorted === Seq("gpu", "fpga").sorted)
+
+      assert(resourceAmounts("gpu").size === 1)
+      assert(resourceAmounts("gpu").keys.toSeq === Seq("2"))
+      
assert(ResourceAmountUtils.toFractionalResource(resourceAmounts("gpu")("2")) ===
+        gpuTaskAmount)
+
+      assert(resourceAmounts("fpga").size === 1)
+      assert(resourceAmounts("fpga").keys.toSeq === Seq("aa"))
+      
assert(ResourceAmountUtils.toFractionalResource(resourceAmounts("fpga")("aa")) 
===
+        fpgaTaskAmount)
+    }
+
+    // assign will not update the real value.
+    var availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 
1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0)))
+      }
+    }
+
+    // acquire will updates the value
+    availableExecResAmounts.acquire(assigned.get)
+
+    // after acquire
+    availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount,
+          Map("2" -> (1.0 - gpuTaskAmount), "4" -> 1.0, "6" -> 1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> (1.0 - 
fpgaTaskAmount), "bb" -> 1.0)))
+      }
+    }
+
+    // release
+    availableExecResAmounts.release(assigned.get)
+
+    availableRes = availableExecResAmounts.availableResources
+    availableRes.foreach { case (rName, addressesAmount) =>
+      if (rName == "gpu") {
+        assert(compareMaps(addressesAmount, Map("2" -> 1.0, "4" -> 1.0, "6" -> 
1.0)))
+      } else {
+        assert(compareMaps(addressesAmount, Map("aa" -> 1.0, "bb" -> 1.0)))
+      }
+    }
+  }
+
+  test("assign/release resource for different task requirements") {
+    val totalRes = Map("gpu" -> Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 
1.0))
+    val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes)
+
+    def testAllocation(taskAmount: Double,
+                       expectedAssignedAddress: Array[String],
+                       expectedAssignedAmount: Array[Double],
+                       expectedLeftRes: Map[String, Double]
+                      ): Unit = {
+      val treqs = new TaskResourceRequests().resource("gpu", taskAmount)
+      val rp = new ResourceProfileBuilder().require(treqs).build()
+      val assigned = availableExecResAmounts.assignAddressesCustomResources(rp)
+      assert(!assigned.isEmpty)
+      assigned.foreach { case resources =>
+        assert(
+          
resources("gpu").values.toArray.sorted.map(ResourceAmountUtils.toFractionalResource(_))
+          === expectedAssignedAmount.sorted)
+
+        availableExecResAmounts.acquire(resources)
+
+        val leftRes = availableExecResAmounts.availableResources
+        assert(leftRes.size == 1)
+        assert(leftRes.keys.toSeq(0) == "gpu")
+        assert(compareMaps(leftRes("gpu"), expectedLeftRes))
+      }
+    }
+
+    def testRelease(releasedRes: Map[String, Double],
+                    expectedLeftRes: Map[String, Double]
+                   ): Unit = {
+      availableExecResAmounts.release(Map("gpu" -> releasedRes))
+
+      val leftRes = availableExecResAmounts.availableResources
+      assert(leftRes.size == 1)
+      assert(leftRes.keys.toSeq(0) == "gpu")
+      assert(compareMaps(leftRes("gpu"), expectedLeftRes))
+    }
+
+    // request 0.2 gpu, ExecutorResourcesAmounts should assign "0",
+    testAllocation(taskAmount = 0.2,
+      expectedAssignedAddress = Array("0"),
+      expectedAssignedAmount = Array(0.2),
+      expectedLeftRes = Map("0" -> 0.8, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    // request 0.2 gpu again, ExecutorResourcesAmounts should assign "0",
+    testAllocation(taskAmount = 0.2,
+      expectedAssignedAddress = Array("0"),
+      expectedAssignedAmount = Array(0.2),
+      expectedLeftRes = Map("0" -> 0.6, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    // request 2 gpus, ExecutorResourcesAmounts should assign "1" and "2",
+    testAllocation(taskAmount = 2,
+      expectedAssignedAddress = Array("1", "2"),
+      expectedAssignedAmount = Array(1.0, 1.0),
+      expectedLeftRes = Map("0" -> 0.6, "1" -> 0.0, "2" -> 0.0, "3" -> 1.0))
+
+    testRelease(releasedRes = Map("0" -> 0.1, "2" -> 0.8),
+      expectedLeftRes = Map("0" -> 0.7, "1" -> 0.0, "2" -> 0.8, "3" -> 1.0))
+
+    // request 0.50002 gpu, ExecutorResourcesAmounts should assign "0",
+    testAllocation(taskAmount = 0.50002,
+      expectedAssignedAddress = Array("0"),
+      expectedAssignedAmount = Array(0.50002),
+      expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 
1.0))
+
+    // request 1 gpu, ExecutorResourcesAmounts should assign "3",
+    testAllocation(taskAmount = 1.0,
+      expectedAssignedAddress = Array("3"),
+      expectedAssignedAmount = Array(1.0),
+      expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.8, "3" -> 
0.0))
+
+    // request 0.2 gpu, ExecutorResourcesAmounts should assign "2",
+    testAllocation(taskAmount = 0.2,
+      expectedAssignedAddress = Array("2"),
+      expectedAssignedAmount = Array(0.2),
+      expectedLeftRes = Map("0" -> 0.19998, "1" -> 0.0, "2" -> 0.6, "3" -> 
0.0))
+
+    testRelease(releasedRes = Map("0" -> 0.80002, "1" -> 1.0, "2" -> 0.4, "3" 
-> 1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    // request 1 gpus, ExecutorResourcesAmounts should assign "0"
+    testAllocation(taskAmount = 1.0,
+      expectedAssignedAddress = Array("0"),
+      expectedAssignedAmount = Array(1.0),
+      expectedLeftRes = Map("0" -> 0.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    testRelease(releasedRes = Map("0" -> 1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    // request 2 gpus, ExecutorResourcesAmounts should assign "0", "1"
+    testAllocation(taskAmount = 2.0,
+      expectedAssignedAddress = Array("0", "1"),
+      expectedAssignedAmount = Array(1.0, 1.0),
+      expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 1.0, "3" -> 1.0))
+
+    testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+
+    // request 4 gpus, ExecutorResourcesAmounts should assign "0", "1", "2", 
"3"
+    testAllocation(taskAmount = 4.0,
+      expectedAssignedAddress = Array("0", "1", "2", "3"),
+      expectedAssignedAmount = Array(1.0, 1.0, 1.0, 1.0),
+      expectedLeftRes = Map("0" -> 0.0, "1" -> 0.0, "2" -> 0.0, "3" -> 0.0))
+
+    testRelease(releasedRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 
1.0),
+      expectedLeftRes = Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 1.0))
+  }
+
+  test("Don't allow acquire resource or address that is not available") {
+    // Init Executor Resource.
+    val totalRes = Map("gpu" -> Map("0" -> 1.0, "1" -> 1.0, "2" -> 1.0, "3" -> 
1.0))
+    val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes)
+
+    // Acquire an address from a resource that doesn't exist
+    val e = intercept[SparkException] {
+      availableExecResAmounts.acquire(toInternalResourceMap(Map("fpga" -> 
Map("1" -> 1.0))))
+    }
+    assert(e.getMessage.contains("Try to acquire an address from fpga that 
doesn't exist"))
+
+    // Acquire an address that is not available
+    val e1 = intercept[SparkException] {
+      availableExecResAmounts.acquire(toInternalResourceMap(Map("gpu" -> 
Map("6" -> 1.0))))
+    }
+    assert(e1.getMessage.contains("Try to acquire an address that doesn't 
exist"))
+  }
+
+  test("Don't allow release resource or address that is not available") {
+    // Init Executor Resource.
+    val totalRes = Map("gpu" -> Map("0" -> 0.5, "1" -> 0.5, "2" -> 0.5, "3" -> 
0.5))
+    val availableExecResAmounts = new ExecutorResourcesAmounts(totalRes)
+
+    // Acquire an address from a resource that doesn't exist
+    val e = intercept[SparkException] {
+      availableExecResAmounts.release(toInternalResourceMap(Map("fpga" -> 
Map("1" -> 0.1))))
+    }
+    assert(e.getMessage.contains("Try to release an address from fpga that 
doesn't exist"))
+
+    // Acquire an address that is not available
+    val e1 = intercept[SparkException] {
+      availableExecResAmounts.release(toInternalResourceMap(Map("gpu" -> 
Map("6" -> 0.1))))
+    }
+    assert(e1.getMessage.contains("Try to release an address that is not 
assigned"))
+  }
+
+}
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
index b36363d0f4cd..5aaacd2ec1c3 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskDescriptionSuite.scala
@@ -24,7 +24,7 @@ import java.util.Properties
 import scala.collection.mutable.HashMap
 
 import org.apache.spark.{JobArtifactSet, SparkFunSuite}
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.ResourceAmountUtils
 import org.apache.spark.resource.ResourceUtils.GPU
 
 class TaskDescriptionSuite extends SparkFunSuite {
@@ -59,8 +59,10 @@ class TaskDescriptionSuite extends SparkFunSuite {
       }
     }
 
-    val originalResources =
-      Map(GPU -> new ResourceInformation(GPU, Array("1", "2", "3")))
+    val originalResources = Map(GPU ->
+      Map("1" -> ResourceAmountUtils.toInternalResource(0.2),
+        "2" -> ResourceAmountUtils.toInternalResource(0.5),
+        "3" -> ResourceAmountUtils.toInternalResource(0.1)))
 
     // Create a dummy byte buffer for the task.
     val taskBuffer = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
@@ -99,17 +101,8 @@ class TaskDescriptionSuite extends SparkFunSuite {
     assert(decodedTaskDescription.artifacts.equals(artifacts))
     
assert(decodedTaskDescription.properties.equals(originalTaskDescription.properties))
     assert(decodedTaskDescription.cpus.equals(originalTaskDescription.cpus))
-    assert(equalResources(decodedTaskDescription.resources, 
originalTaskDescription.resources))
+    assert(decodedTaskDescription.resources === 
originalTaskDescription.resources)
     assert(decodedTaskDescription.serializedTask.equals(taskBuffer))
-
-    def equalResources(original: Map[String, ResourceInformation],
-        target: Map[String, ResourceInformation]): Boolean = {
-      original.size == target.size && original.forall { case (name, info) =>
-        target.get(name).exists { targetInfo =>
-          info.name.equals(targetInfo.name) &&
-            info.addresses.sameElements(targetInfo.addresses)
-        }
-      }
-    }
   }
+
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 2ab7df0d9cfd..72d0354c5577 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -22,8 +22,10 @@ import java.util.Properties
 import java.util.concurrent.{CountDownLatch, ExecutorService, 
LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 
+import scala.collection.mutable
 import scala.collection.mutable.{ArrayBuffer, HashMap}
 import scala.concurrent.duration._
+import scala.language.implicitConversions
 import scala.language.reflectiveCalls
 
 import org.mockito.ArgumentMatchers.{any, anyInt, anyString, eq => meq}
@@ -33,7 +35,8 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.internal.config
-import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
TaskResourceProfile, TaskResourceRequests}
+import org.apache.spark.resource.{ExecutorResourceRequests, 
ResourceAmountUtils, ResourceProfile, TaskResourceProfile, TaskResourceRequests}
+import org.apache.spark.resource.ResourceAmountUtils.{ONE_ENTIRE_RESOURCE}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.status.api.v1.ThreadStackTrace
@@ -145,6 +148,15 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     taskScheduler
   }
 
+  // Convert resources to ExecutorResourcesAmounts automatically
+  implicit def toExecutorResourcesAmounts(resources: Map[String, 
mutable.Buffer[String]]):
+      ExecutorResourcesAmounts = {
+    // convert the old resources to ExecutorResourcesAmounts
+    new ExecutorResourcesAmounts(resources.map { case (rName, addresses) =>
+      rName -> addresses.map(address => address -> ONE_ENTIRE_RESOURCE).toMap
+    })
+  }
+
   test("SPARK-32653: Decommissioned host/executor should be considered as 
inactive") {
     val scheduler = setupScheduler()
     val exec0 = "exec0"
@@ -1738,7 +1750,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val singleCoreWorkerOffers =
       IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, 
resources))
     val zeroGpuWorkerOffers =
-      IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, 
Map.empty))
+      IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None))
     taskScheduler.submitTasks(taskSet)
     // WorkerOffer doesn't contain GPU resource, don't launch any task.
     var taskDescriptions = 
taskScheduler.resourceOffers(zeroGpuWorkerOffers).flatten
@@ -1748,8 +1760,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     taskDescriptions = 
taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
     assert(2 === taskDescriptions.length)
     assert(!failedTaskSet)
-    assert(ArrayBuffer("0") === 
taskDescriptions(0).resources.get(GPU).get.addresses)
-    assert(ArrayBuffer("1") === 
taskDescriptions(1).resources.get(GPU).get.addresses)
+    assert(ArrayBuffer("0") === 
taskDescriptions(0).resources.get(GPU).get.keys.toArray.sorted)
+    assert(ArrayBuffer("1") === 
taskDescriptions(1).resources.get(GPU).get.keys.toArray.sorted)
   }
 
   test("Scheduler correctly accounts for GPUs per task with fractional 
amount") {
@@ -1775,9 +1787,9 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val taskDescriptions = 
taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
     assert(3 === taskDescriptions.length)
     assert(!failedTaskSet)
-    assert(ArrayBuffer("0") === 
taskDescriptions(0).resources.get(GPU).get.addresses)
-    assert(ArrayBuffer("0") === 
taskDescriptions(1).resources.get(GPU).get.addresses)
-    assert(ArrayBuffer("0") === 
taskDescriptions(2).resources.get(GPU).get.addresses)
+    assert(ArrayBuffer("0") === 
taskDescriptions(0).resources.get(GPU).get.keys.toArray.sorted)
+    assert(ArrayBuffer("0") === 
taskDescriptions(1).resources.get(GPU).get.keys.toArray.sorted)
+    assert(ArrayBuffer("0") === 
taskDescriptions(2).resources.get(GPU).get.keys.toArray.sorted)
   }
 
   test("Scheduler works with multiple ResourceProfiles and gpus") {
@@ -1815,10 +1827,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     var has1Gpu = 0
     for (tDesc <- taskDescriptions) {
       assert(tDesc.resources.contains(GPU))
-      if (tDesc.resources(GPU).addresses.length == 2) {
+      if (tDesc.resources(GPU).keys.size == 2) {
         has2Gpus += 1
       }
-      if (tDesc.resources(GPU).addresses.length == 1) {
+      if (tDesc.resources(GPU).keys.size == 1) {
         has1Gpu += 1
       }
     }
@@ -1830,13 +1842,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     // clear the first 2 worker offers so they don't have any room and add a 
third
     // for the resource profile
     val workerOffers3 = IndexedSeq(
-      new WorkerOffer("executor0", "host0", 0, None, Map.empty),
-      new WorkerOffer("executor1", "host1", 0, None, Map.empty, rp.id),
+      new WorkerOffer("executor0", "host0", 0, None),
+      new WorkerOffer("executor1", "host1", 0, None, 
ExecutorResourcesAmounts.empty, rp.id),
       new WorkerOffer("executor2", "host2", 6, None, resources3, rp.id))
     taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten
     assert(2 === taskDescriptions.length)
     assert(taskDescriptions.head.resources.contains(GPU))
-    assert(2 == taskDescriptions.head.resources(GPU).addresses.length)
+    assert(2 == taskDescriptions.head.resources(GPU).keys.size)
   }
 
   test("Scheduler works with task resource profiles") {
@@ -1875,10 +1887,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     var has1Gpu = 0
     for (tDesc <- taskDescriptions) {
       assert(tDesc.resources.contains(GPU))
-      if (tDesc.resources(GPU).addresses.length == 2) {
+      if (tDesc.resources(GPU).keys.size == 2) {
         has2Gpus += 1
       }
-      if (tDesc.resources(GPU).addresses.length == 1) {
+      if (tDesc.resources(GPU).keys.size == 1) {
         has1Gpu += 1
       }
     }
@@ -1890,13 +1902,13 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     // clear the first 2 worker offers so they don't have any room and add a 
third
     // for the resource profile
     val workerOffers3 = IndexedSeq(
-      WorkerOffer("executor0", "host0", 0, None, Map.empty),
-      WorkerOffer("executor1", "host1", 0, None, Map.empty),
+      WorkerOffer("executor0", "host0", 0, None),
+      WorkerOffer("executor1", "host1", 0, None),
       WorkerOffer("executor2", "host2", 4, None, resources3))
     taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten
     assert(2 === taskDescriptions.length)
     assert(taskDescriptions.head.resources.contains(GPU))
-    assert(2 == taskDescriptions.head.resources(GPU).addresses.length)
+    assert(2 == taskDescriptions.head.resources(GPU).keys.size)
   }
 
   test("Calculate available tasks slots for task resource profiles") {
@@ -1922,11 +1934,11 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     val workerOffers =
       IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0),
         WorkerOffer("executor1", "host1", 4, None, resources1))
-    val availableResourcesAmount = workerOffers.map(_.resources).map { 
resourceMap =>
-        // available addresses already takes into account if there are 
fractional
-        // task resource requests
-        resourceMap.map { case (name, addresses) => (name, addresses.length) }
-      }
+    val availableResourcesAmount = workerOffers.map(_.resources).map { 
resAmounts =>
+      // available addresses already takes into account if there are fractional
+      // task resource requests
+      resAmounts.resourceAddressAmount
+    }
 
     val taskSlotsForRp = TaskSchedulerImpl.calculateAvailableSlots(
       taskScheduler, taskScheduler.conf, rp.id, 
workerOffers.map(_.resourceProfileId).toArray,
@@ -2283,4 +2295,425 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     taskScheduler.handleFailedTask(tsm, tid, state, reason)
   }
 
+  private implicit def toInternalResource(resources: Map[String, Double]): 
Map[String, Long] =
+    resources.map { case (k, v) => k -> 
ResourceAmountUtils.toInternalResource(v) }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can 
" +
+        s"restrict $taskNum $barrier tasks run in the same executor") {
+        val taskCpus = 1
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+        val executorGpus = 1
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> taskCpus.toString,
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum)
+        }
+
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0))))
+
+        val workerOffers =
+          IndexedSeq(WorkerOffer("executor0", "host0", executorCpus, 
Some("host0"), resources))
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        var gpuAddress = -1
+        for (taskId <- 0 until 4 * taskNum) {
+          if (taskId % taskNum == 0) {
+            gpuAddress += 1
+          }
+          assert(ArrayBuffer(gpuAddress.toString) ===
+            
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+        }
+      }
+    }
+  }
+
+  // 4 executors, each of which has 1 GPU
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 default rp with task.gpu.amount=${gpuTaskAmount} can 
" +
+        s"restrict $taskNum $barrier tasks run on the different executor") {
+        val taskCpus = 1
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+        val executorGpus = 1
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> taskCpus.toString,
+          TASK_GPU_ID.amountConf -> gpuTaskAmount.toString,
+          EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum)
+        }
+
+        val workerOffers =
+          IndexedSeq(
+            WorkerOffer("executor0", "host0", executorCpus, Some("host0"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("0" -> 1.0))))),
+            WorkerOffer("executor1", "host1", executorCpus, Some("host1"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("1" -> 1.0))))),
+            WorkerOffer("executor2", "host2", executorCpus, Some("host2"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("2" -> 1.0))))),
+            WorkerOffer("executor3", "host3", executorCpus, Some("host3"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("3" -> 1.0))))))
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements
+
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        val assignedGpus: HashMap[String, Int] = HashMap.empty
+        for (taskId <- 0 until 4 * taskNum) {
+          val gpus = 
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
+          assert(gpus.length == 1)
+          val addr = gpus(0)
+          if (!assignedGpus.contains(addr)) {
+            assignedGpus(addr) = 1
+          } else {
+            assignedGpus(addr) += 1
+          }
+        }
+        assert(assignedGpus.toMap ===
+          Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
+      }
+    }
+  }
+
+  // 1 executor with 4 GPUS
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 TaskResourceProfile with 
task.gpu.amount=${gpuTaskAmount} can " +
+        s"restrict $taskNum $barrier tasks run in the same executor") {
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> "0.1",
+          EXECUTOR_GPU_ID.amountConf -> "4",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 
gpuTaskAmount)
+        val rp = new TaskResourceProfile(treqs.requests)
+        taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
+        }
+        val resources = new ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0))))
+
+        val workerOffers = IndexedSeq(
+          WorkerOffer("executor0", "host0", executorCpus, Some("host0"), 
resources, rp.id)
+        )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements.
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        var gpuAddress = -1
+        for (taskId <- 0 until 4 * taskNum) {
+          if (taskId % taskNum == 0) {
+            gpuAddress += 1
+          }
+          assert(ArrayBuffer(gpuAddress.toString) ===
+            
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted)
+        }
+      }
+    }
+  }
+
+  // 4 executors, each of which has 1 GPU
+  Seq(true, false).foreach { barrierMode =>
+    val barrier = if (barrierMode) "barrier" else ""
+    (1 to 20).foreach { taskNum =>
+      val gpuTaskAmount = 
ResourceAmountUtils.toFractionalResource(ONE_ENTIRE_RESOURCE / taskNum)
+      test(s"SPARK-45527 TaskResourceProfile with 
task.gpu.amount=${gpuTaskAmount} can " +
+        s"restrict $taskNum $barrier tasks run on the different executor") {
+        val executorCpus = 100 // cpu will not limit the concurrent tasks 
number
+
+        val taskScheduler = setupScheduler(numCores = executorCpus,
+          config.CPUS_PER_TASK.key -> "1",
+          TASK_GPU_ID.amountConf -> "0.1",
+          EXECUTOR_GPU_ID.amountConf -> "1",
+          config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+        val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 
gpuTaskAmount)
+        val rp = new TaskResourceProfile(treqs.requests)
+        taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+        val taskSet = if (barrierMode) {
+          FakeTask.createTaskSet(100, 0, 1, 1, rp.id)
+        } else {
+          FakeTask.createBarrierTaskSet(4 * taskNum, 0, 1, 1, rp.id)
+        }
+
+        val workerOffers =
+          IndexedSeq(
+            WorkerOffer("executor0", "host0", executorCpus, Some("host1"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("0" -> 1.0)))),
+              rp.id),
+            WorkerOffer("executor1", "host1", executorCpus, Some("host2"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("1" -> 1.0)))),
+              rp.id),
+            WorkerOffer("executor2", "host2", executorCpus, Some("host3"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("2" -> 1.0)))),
+              rp.id),
+            WorkerOffer("executor3", "host3", executorCpus, Some("host4"),
+              new ExecutorResourcesAmounts(Map(GPU -> 
toInternalResource(Map("3" -> 1.0)))),
+              rp.id)
+          )
+
+        taskScheduler.submitTasks(taskSet)
+        // Launch tasks on executor that satisfies resource requirements
+
+        val taskDescriptions = 
taskScheduler.resourceOffers(workerOffers).flatten
+        assert(4 * taskNum === taskDescriptions.length)
+        assert(!failedTaskSet)
+        val assignedGpus: HashMap[String, Int] = HashMap.empty
+        for (taskId <- 0 until 4 * taskNum) {
+          val gpus = 
taskDescriptions(taskId).resources.get(GPU).get.keys.toArray.sorted
+          assert(gpus.length == 1)
+          val addr = gpus(0)
+          if (!assignedGpus.contains(addr)) {
+            assignedGpus(addr) = 1
+          } else {
+            assignedGpus(addr) += 1
+          }
+        }
+        assert(assignedGpus.toMap ===
+          Map("0" -> taskNum, "1" -> taskNum, "2" -> taskNum, "3" -> taskNum))
+      }
+    }
+  }
+
+  test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 
executor " +
+    "can assign to other taskset") {
+    val taskCpus = 1
+    val taskGpus = 0.3
+    val executorGpus = 4
+    val executorCpus = 1000
+
+    // each tasks require 0.3 gpu
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+    val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+
+    // each task require 0.7 gpu
+    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, 
stageAttemptId = 0,
+      priority = 0, rpId = rp.id)
+
+    val workerOffers =
+      IndexedSeq(
+        // cpu won't be a problem
+        WorkerOffer("executor0", "host0", 1000, None, new 
ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0)))))
+      )
+
+    taskScheduler.submitTasks(lowerTaskSet)
+    taskScheduler.submitTasks(higherRpTaskSet)
+
+    // should have 3 for default profile and 2 for additional resource profile
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(8 === taskDescriptions.length)
+    var index = 0
+    for (tDesc <- taskDescriptions) {
+      assert(tDesc.resources.contains(GPU))
+      val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+      assert(addresses.length == 1)
+      if (index < 4) { // the first 4 tasks will grab 0.7 gpu
+        assert(addresses(0) == index.toString)
+        assert(ResourceAmountUtils.toFractionalResource(
+          tDesc.resources.get(GPU).get(index.toString)) == 0.7)
+      } else {
+        assert(addresses(0) == (index - 4).toString)
+        assert(ResourceAmountUtils.toFractionalResource(
+          tDesc.resources.get(GPU).get((index - 4).toString)) == 0.3)
+      }
+      index += 1
+    }
+  }
+
+  test("SPARK-45527 TaskResourceProfile: the left gpu resources on multiple 
executors " +
+    "can assign to other taskset") {
+    val taskCpus = 1
+    val taskGpus = 0.3
+    val executorGpus = 4
+    val executorCpus = 1000
+
+    // each tasks require 0.3 gpu
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+    val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+
+    // each task require 0.7 gpu
+    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, 
stageAttemptId = 0,
+      priority = 0, rpId = rp.id)
+
+    val workerOffers =
+      IndexedSeq(
+        // cpu won't be a problem
+        WorkerOffer("executor0", "host0", 1000, None, new 
ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0))))),
+        WorkerOffer("executor1", "host1", 1000, None, new 
ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("1" -> 1.0))))),
+        WorkerOffer("executor2", "host2", 1000, None, new 
ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("2" -> 1.0))))),
+        WorkerOffer("executor3", "host3", 1000, None, new 
ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("3" -> 1.0)))))
+      )
+
+    taskScheduler.submitTasks(lowerTaskSet)
+    taskScheduler.submitTasks(higherRpTaskSet)
+
+    // should have 3 for default profile and 2 for additional resource profile
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(8 === taskDescriptions.length)
+
+    var index = 0
+    val higherAssignedExecutorsGpus = ArrayBuffer[(String, String)]()
+    val lowerAssignedExecutorsGpus = ArrayBuffer[(String, String)]()
+
+    for (tDesc <- taskDescriptions) {
+      assert(tDesc.resources.contains(GPU))
+      val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+      assert(addresses.length == 1)
+      val address = addresses(0)
+
+      //    Executor 0,       executor 1,           executor 2,      executor 3
+      // task_0.7, task_03  task_0.7, task_03  task_0.7, task_03  task_0.7, 
task_03
+      if (index % 2 == 0) {
+        higherAssignedExecutorsGpus.append(
+          (tDesc.executorId, address))
+        assert(ResourceAmountUtils.toFractionalResource(
+          tDesc.resources.get(GPU).get(address)) == 0.7)
+      } else {
+        lowerAssignedExecutorsGpus.append(
+          (tDesc.executorId, address))
+        assert(ResourceAmountUtils.toFractionalResource(
+          tDesc.resources.get(GPU).get(address)) == 0.3)
+      }
+      index += 1
+    }
+
+    assert(higherAssignedExecutorsGpus.sorted sameElements
+      ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), 
("executor3", "3")
+      ))
+    assert(lowerAssignedExecutorsGpus.sorted sameElements
+      ArrayBuffer(("executor0", "0"), ("executor1", "1"), ("executor2", "2"), 
("executor3", "3")
+      ))
+  }
+
+  test("SPARK-45527 TaskResourceProfile: the left multiple gpu resources on 1 
executor " +
+    "can't assign to other taskset due to not enough gpu resource") {
+    val taskCpus = 1
+    val taskGpus = 0.4
+    val executorGpus = 4
+    val executorCpus = 4
+
+    // each tasks require 0.3 gpu
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+    val lowerTaskSet = FakeTask.createTaskSet(100, 1, 0, 1,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+
+    // each task require 0.7 gpu
+    val treqs = new TaskResourceRequests().cpus(1).resource(GPU, 0.7)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val higherRpTaskSet = FakeTask.createTaskSet(1000, stageId = 2, 
stageAttemptId = 0,
+      priority = 0, rpId = rp.id)
+
+    val workerOffers =
+      IndexedSeq(
+        // cpu won't be a problem
+        WorkerOffer("executor0", "host0", 1000, None, new 
ExecutorResourcesAmounts(
+          Map(GPU -> toInternalResource(Map("0" -> 1.0, "1" -> 1.0, "2" -> 
1.0, "3" -> 1.0)))))
+      )
+
+    taskScheduler.submitTasks(lowerTaskSet)
+    taskScheduler.submitTasks(higherRpTaskSet)
+
+    // only offer the resources to the higher taskset
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(4 === taskDescriptions.length)
+    var index = 0
+    for (tDesc <- taskDescriptions) {
+      assert(tDesc.resources.contains(GPU))
+      val addresses = tDesc.resources.get(GPU).get.keys.toArray.sorted
+      assert(addresses.length == 1)
+      assert(addresses(0) == index.toString)
+      assert(ResourceAmountUtils.toFractionalResource(
+        tDesc.resources.get(GPU).get(index.toString)) == 0.7)
+      index += 1
+    }
+  }
+
+  test("SPARK-45527 schedule tasks for a barrier taskSet if all tasks can be 
launched together") {
+    val taskCpus = 2
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]",
+      config.CPUS_PER_TASK.key -> taskCpus.toString)
+
+    val numFreeCores = 3
+    val workerOffers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625")),
+      new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627")),
+      new WorkerOffer("executor2", "host2", numFreeCores, 
Some("192.168.0.101:49629")))
+    val attempt1 = FakeTask.createBarrierTaskSet(3)
+
+    // submit attempt 1, offer some resources, all tasks get launched together
+    taskScheduler.submitTasks(attempt1)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(3 === taskDescriptions.length)
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 2f8b6df8beac..26b38bfcc9ab 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -38,7 +38,8 @@ import org.apache.spark.executor.{ExecutorMetrics, 
TaskMetrics}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.{SKIP_VALIDATE_CORES_TESTING, 
TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED}
-import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
+import org.apache.spark.resource.ResourceAmountUtils.ONE_ENTIRE_RESOURCE
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -1825,7 +1826,8 @@ class TaskSetManagerSuite
     val taskSet = FakeTask.createTaskSet(1)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
-    val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, 
Array("0", "1")))
+    val taskResourceAssignments = Map(
+      GPU -> Map("0" -> ONE_ENTIRE_RESOURCE, "1" -> ONE_ENTIRE_RESOURCE))
     val taskOption =
       manager.resourceOffer("exec1", "host1", NO_PREF, 2, 
taskResourceAssignments)._1
     assert(taskOption.isDefined)
@@ -1833,7 +1835,9 @@ class TaskSetManagerSuite
     val allocatedResources = taskOption.get.resources
     assert(allocatedCpus == 2)
     assert(allocatedResources.size == 1)
-    assert(allocatedResources(GPU).addresses sameElements Array("0", "1"))
+    assert(allocatedResources(GPU).keys.toArray.sorted sameElements Array("0", 
"1"))
+    assert(allocatedResources === taskResourceAssignments)
+
   }
 
   test("SPARK-26755 Ensure that a speculative task is submitted only once for 
execution") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to