This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 474b1bb  [SPARK-29154][CORE] Update Spark scheduler for stage level 
scheduling
474b1bb is described below

commit 474b1bb5c2bce2f83c4dd8e19b9b7c5b3aebd6c4
Author: Thomas Graves <tgra...@nvidia.com>
AuthorDate: Thu Mar 26 09:46:36 2020 -0500

    [SPARK-29154][CORE] Update Spark scheduler for stage level scheduling
    
    ### What changes were proposed in this pull request?
    
    This is the core scheduler changes to support Stage level scheduling.
    
    The main changes here include modification to the DAGScheduler to look at 
the ResourceProfiles associated with an RDD and have those applied inside the 
scheduler.
    Currently if multiple RDD's in a stage have conflicting ResourceProfiles we 
throw an error. logic to allow this will happen in SPARK-29153. I added the 
interfaces to RDD to add and get the REsourceProfile so that I could add unit 
tests for the scheduler. These are marked as private for now until we finish 
the feature and will be exposed in SPARK-29150. If you think this is confusing 
I can remove those and remove the tests and add them back later.
    I modified the task scheduler to make sure to only schedule on executor 
that exactly match the resource profile. It will then check those executors to 
make sure the current resources meet the task needs before assigning it.  In 
here I changed the way we do the custom resource assignment.
    Other changes here include having the cpus per task passed around so that 
we can properly account for them. Previously we just used the one global 
config, but now it can change based on the ResourceProfile.
    I removed the exceptions that require the cores to be the limiting 
resource. With this change all the places I found that used executor cores 
/task cpus as slots has been updated to use the ResourceProfile logic and look 
to see what resource is limiting.
    
    ### Why are the changes needed?
    
    Stage level sheduling feature
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    unit tests and lots of manual testing
    
    Closes #27773 from tgravescs/SPARK-29154.
    
    Lead-authored-by: Thomas Graves <tgra...@nvidia.com>
    Co-authored-by: Thomas Graves <tgra...@apache.org>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../main/scala/org/apache/spark/SparkContext.scala |  27 +--
 .../org/apache/spark/internal/config/Tests.scala   |   9 +
 core/src/main/scala/org/apache/spark/rdd/RDD.scala |  27 +++
 .../apache/spark/resource/ResourceProfile.scala    |  42 +++--
 .../spark/resource/ResourceProfileManager.scala    |  11 +-
 .../org/apache/spark/resource/ResourceUtils.scala  |  13 +-
 .../spark/resource/TaskResourceRequests.scala      |   2 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  70 +++++---
 .../apache/spark/scheduler/SchedulerBackend.scala  |   8 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 178 ++++++++++++++-----
 .../scala/org/apache/spark/scheduler/TaskSet.scala |   3 +-
 .../apache/spark/scheduler/TaskSetManager.scala    |  32 ++--
 .../org/apache/spark/scheduler/WorkerOffer.scala   |   5 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  24 ++-
 .../scheduler/local/LocalSchedulerBackend.scala    |   9 +-
 .../deploy/StandaloneDynamicAllocationSuite.scala  |   1 -
 .../CoarseGrainedExecutorBackendSuite.scala        |   4 +-
 .../CoarseGrainedSchedulerBackendSuite.scala       |  13 +-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 115 +++++++++++-
 .../scheduler/ExternalClusterManagerSuite.scala    |   3 +-
 .../org/apache/spark/scheduler/FakeTask.scala      |  31 +++-
 .../org/apache/spark/scheduler/PoolSuite.scala     |   4 +-
 .../scheduler/SchedulerIntegrationSuite.scala      |   5 +-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 192 ++++++++++++++++++++-
 .../spark/scheduler/TaskSetManagerSuite.scala      |  91 ++++++----
 .../mesos/MesosFineGrainedSchedulerBackend.scala   |   3 +-
 26 files changed, 704 insertions(+), 218 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index cdb98db..588e7dc 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1597,13 +1597,17 @@ class SparkContext(config: SparkConf) extends Logging {
   }
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the 
ResourceProfile
+   * being used.
    * Note that please don't cache the value returned by this method, because 
the number can change
    * due to add/remove executors.
    *
+   * @param rp ResourceProfile which to use to calculate max concurrent tasks.
    * @return The max number of tasks that can be concurrent launched currently.
    */
-  private[spark] def maxNumConcurrentTasks(): Int = 
schedulerBackend.maxNumConcurrentTasks()
+  private[spark] def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+    schedulerBackend.maxNumConcurrentTasks(rp)
+  }
 
   /**
    * Update the cluster manager on our scheduling needs. Three bits of 
information are included
@@ -2764,23 +2768,10 @@ object SparkContext extends Logging {
     // others its checked in ResourceProfile.
     def checkResourcesPerTask(executorCores: Int): Unit = {
       val taskCores = sc.conf.get(CPUS_PER_TASK)
-      validateTaskCpusLargeEnough(executorCores, taskCores)
-      val defaultProf = sc.resourceProfileManager.defaultResourceProfile
-      // TODO - this is temporary until all of stage level scheduling feature 
is integrated,
-      // fail if any other resource limiting due to dynamic allocation and 
scheduler using
-      // slots based on cores
-      val cpuSlots = executorCores/taskCores
-      val limitingResource = defaultProf.limitingResource(sc.conf)
-      if (limitingResource.nonEmpty && 
!limitingResource.equals(ResourceProfile.CPUS) &&
-        defaultProf.maxTasksPerExecutor(sc.conf) < cpuSlots) {
-        throw new IllegalArgumentException("The number of slots on an executor 
has to be " +
-          "limited by the number of cores, otherwise you waste resources and " 
+
-          "some scheduling doesn't work properly. Your configuration has " +
-          s"core/task cpu slots = ${cpuSlots} and " +
-          s"${limitingResource} = " +
-          s"${defaultProf.maxTasksPerExecutor(sc.conf)}. Please adjust your 
configuration " +
-          "so that all resources require same number of executor slots.")
+      if (!sc.conf.get(SKIP_VALIDATE_CORES_TESTING)) {
+        validateTaskCpusLargeEnough(sc.conf, executorCores, taskCores)
       }
+      val defaultProf = sc.resourceProfileManager.defaultResourceProfile
       ResourceUtils.warnOnWastedResources(defaultProf, sc.conf, 
Some(executorCores))
     }
 
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
index 15610e8..33dac04 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Tests.scala
@@ -73,4 +73,13 @@ private[spark] object Tests {
       .booleanConf
       .createWithDefault(false)
 
+  // This configuration is used for unit tests to allow skipping the task cpus 
to cores validation
+  // to allow emulating standalone mode behavior while running in local mode. 
Standalone mode
+  // by default doesn't specify a number of executor cores, it just uses all 
the ones available
+  // on the host.
+  val SKIP_VALIDATE_CORES_TESTING =
+    ConfigBuilder("spark.testing.skipValidateCores")
+      .booleanConf
+      .createWithDefault(false)
+
 }
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index a26b579..f59c587 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -42,6 +42,7 @@ import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.storage.{RDDBlockId, StorageLevel}
 import org.apache.spark.util.{BoundedPriorityQueue, Utils}
 import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap,
@@ -1716,11 +1717,37 @@ abstract class RDD[T: ClassTag](
   @Since("2.4.0")
   def barrier(): RDDBarrier[T] = withScope(new RDDBarrier[T](this))
 
+  /**
+   * Specify a ResourceProfile to use when calculating this RDD. This is only 
supported on
+   * certain cluster managers and currently requires dynamic allocation to be 
enabled.
+   * It will result in new executors with the resources specified being 
acquired to
+   * calculate the RDD.
+   */
+  // PRIVATE for now, added for testing purposes, will be made public with 
SPARK-29150
+  @Experimental
+  @Since("3.0.0")
+  private[spark] def withResources(rp: ResourceProfile): this.type = {
+    resourceProfile = Option(rp)
+    sc.resourceProfileManager.addResourceProfile(resourceProfile.get)
+    this
+  }
+
+  /**
+   * Get the ResourceProfile specified with this RDD or null if it wasn't 
specified.
+   * @return the user specified ResourceProfile or null (for Java 
compatibility) if
+   *         none was specified
+   */
+  // PRIVATE for now, added for testing purposes, will be made public with 
SPARK-29150
+  @Experimental
+  @Since("3.0.0")
+  private[spark] def getResourceProfile(): ResourceProfile = 
resourceProfile.getOrElse(null)
+
   // =======================================================================
   // Other internal methods and fields
   // =======================================================================
 
   private var storageLevel: StorageLevel = StorageLevel.NONE
+  private var resourceProfile: Option[ResourceProfile] = None
 
   /** User code that created this RDD (e.g. `textFile`, `parallelize`). */
   @transient private[spark] val creationSite = sc.getCallSite()
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 844026d..96c456e 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -76,6 +76,21 @@ class ResourceProfile(
     taskResources.get(ResourceProfile.CPUS).map(_.amount.toInt)
   }
 
+  /*
+   * This function takes into account fractional amounts for the task resource 
requirement.
+   * Spark only supports fractional amounts < 1 to basically allow for 
multiple tasks
+   * to use the same resource address.
+   * The way the scheduler handles this is it adds the same address the number 
of slots per
+   * address times and then the amount becomes 1. This way it re-uses the same 
address
+   * the correct number of times. ie task requirement amount=0.25 -> 
addrs["0", "0", "0", "0"]
+   * and scheduler task amount=1. See ResourceAllocator.slotsPerAddress.
+   */
+  private[spark] def getSchedulerTaskResourceAmount(resource: String): Int = {
+    val taskAmount = taskResources.getOrElse(resource,
+      throw new SparkException(s"Resource $resource doesn't exist in profile 
id: $id"))
+   if (taskAmount.amount < 1) 1 else taskAmount.amount.toInt
+  }
+
   private[spark] def getNumSlotsPerAddress(resource: String, sparkConf: 
SparkConf): Int = {
     _executorResourceSlotsPerAddr.getOrElse {
       calculateTasksAndLimitingResource(sparkConf)
@@ -137,7 +152,7 @@ class ResourceProfile(
       assert(cpusPerTask > 0, "CPUs per task configuration has to be > 0")
       val coresPerExecutor = 
getExecutorCores.getOrElse(sparkConf.get(EXECUTOR_CORES))
       _coresLimitKnown = true
-      ResourceUtils.validateTaskCpusLargeEnough(coresPerExecutor, cpusPerTask)
+      ResourceUtils.validateTaskCpusLargeEnough(sparkConf, coresPerExecutor, 
cpusPerTask)
       val tasksBasedOnCores = coresPerExecutor / cpusPerTask
       // Note that if the cores per executor aren't set properly this 
calculation could be off,
       // we default it to just be 1 in order to allow checking of the rest of 
the custom
@@ -163,17 +178,6 @@ class ResourceProfile(
         numPartsPerResourceMap(rName) = parts
         val numTasks = ((execReq.amount * parts) / numPerTask).toInt
         if (taskLimit == -1 || numTasks < taskLimit) {
-          if (shouldCheckExecCores) {
-            // TODO - until resource profiles full implemented we need to 
error if cores not
-            // limiting resource because the scheduler code uses that for slots
-            throw new IllegalArgumentException("The number of slots on an 
executor has to be " +
-              "limited by the number of cores, otherwise you waste resources 
and " +
-              "some scheduling doesn't work properly. Your configuration has " 
+
-              s"core/task cpu slots = ${taskLimit} and " +
-              s"${execReq.resourceName} = ${numTasks}. " +
-              "Please adjust your configuration so that all resources require 
same number " +
-              "of executor slots.")
-          }
           limitingResource = rName
           taskLimit = numTasks
         }
@@ -183,12 +187,6 @@ class ResourceProfile(
           "no corresponding task resource request was specified.")
       }
     }
-    if(!shouldCheckExecCores && execResourceToCheck.nonEmpty) {
-      // if we can't rely on the executor cores config throw a warning for user
-      logWarning("Please ensure that the number of slots available on your " +
-        "executors is limited by the number of cores to task cpus and not 
another " +
-        "custom resource.")
-    }
     if (taskResourcesToCheck.nonEmpty) {
       throw new SparkException("No executor resource configs were not 
specified for the " +
         s"following task configs: ${taskResourcesToCheck.keys.mkString(",")}")
@@ -319,4 +317,12 @@ object ResourceProfile extends Logging {
       rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
     rp.executorResources.filterKeys(k => 
!ResourceProfile.allSupportedExecutorResources.contains(k))
   }
+
+  /*
+   * Get the number of cpus per task if its set in the profile, otherwise 
return the
+   * cpus per task for the default profile.
+   */
+  private[spark] def getTaskCpusOrDefaultForProfile(rp: ResourceProfile, conf: 
SparkConf): Int = {
+    rp.getTaskCpus.getOrElse(conf.get(CPUS_PER_TASK))
+  }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 06db946..fabc0bd 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -41,7 +41,6 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf) extends Loggin
 
   def defaultResourceProfile: ResourceProfile = defaultProfile
 
-  private val taskCpusDefaultProfile = defaultProfile.getTaskCpus.get
   private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
   private val master = sparkConf.getOption("spark.master")
   private val isNotYarn = master.isDefined && !master.get.equals("yarn")
@@ -64,8 +63,10 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf) extends Loggin
     isSupported(rp)
     // force the computation of maxTasks and limitingResource now so we don't 
have cost later
     rp.limitingResource(sparkConf)
-    logInfo(s"Adding ResourceProfile id: ${rp.id}")
-    resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    val res = resourceProfileIdToResourceProfile.putIfAbsent(rp.id, rp)
+    if (res == null) {
+      logInfo(s"Added ResourceProfile id: ${rp.id}")
+    }
   }
 
   /*
@@ -79,8 +80,4 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf) extends Loggin
     }
     rp
   }
-
-  def taskCpusForProfileId(rpId: Int): Int = {
-    resourceProfileFromId(rpId).getTaskCpus.getOrElse(taskCpusDefaultProfile)
-  }
 }
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 2227255..36ef906 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.resource.ResourceDiscoveryPlugin
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.{CPUS_PER_TASK, EXECUTOR_CORES, 
RESOURCES_DISCOVERY_PLUGIN, SPARK_TASK_PREFIX}
-import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING}
+import org.apache.spark.internal.config.Tests.{RESOURCES_WARNING_TESTING, 
SKIP_VALIDATE_CORES_TESTING}
 import org.apache.spark.util.Utils
 
 /**
@@ -392,7 +392,7 @@ private[spark] object ResourceUtils extends Logging {
       s"${resourceRequest.id.resourceName}")
   }
 
-  def validateTaskCpusLargeEnough(execCores: Int, taskCpus: Int): Boolean = {
+  def validateTaskCpusLargeEnough(sparkConf: SparkConf, execCores: Int, 
taskCpus: Int): Boolean = {
     // Number of cores per executor must meet at least one task requirement.
     if (execCores < taskCpus) {
       throw new SparkException(s"The number of cores per executor 
(=$execCores) has to be >= " +
@@ -414,7 +414,7 @@ private[spark] object ResourceUtils extends Logging {
     val coresKnown = rp.isCoresLimitKnown
     var limitingResource = rp.limitingResource(sparkConf)
     var maxTaskPerExec = rp.maxTasksPerExecutor(sparkConf)
-    val taskCpus = rp.getTaskCpus.getOrElse(sparkConf.get(CPUS_PER_TASK))
+    val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, 
sparkConf)
     val cores = if (execCores.isDefined) {
       execCores.get
     } else if (coresKnown) {
@@ -455,11 +455,12 @@ private[spark] object ResourceUtils extends Logging {
 
     taskReq.foreach { case (rName, treq) =>
       val execAmount = execReq(rName).amount
+      // handles fractional
+      val taskAmount = rp.getSchedulerTaskResourceAmount(rName)
       val numParts = rp.getNumSlotsPerAddress(rName, sparkConf)
-      // handle fractional
-      val taskAmount = if (numParts > 1) 1 else treq.amount
       if (maxTaskPerExec < (execAmount * numParts / taskAmount)) {
-        val taskReqStr = s"${taskAmount}/${numParts}"
+        val origTaskAmount = treq.amount
+        val taskReqStr = s"${origTaskAmount}/${numParts}"
         val resourceNumSlots = Math.floor(execAmount * numParts / 
taskAmount).toInt
         val message = s"The configuration of resource: ${treq.resourceName} " +
           s"(exec = ${execAmount}, task = ${taskReqStr}, " +
diff --git 
a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala 
b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
index 9624b51..9a5114f 100644
--- a/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
+++ b/core/src/main/scala/org/apache/spark/resource/TaskResourceRequests.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.resource.ResourceProfile._
 
 /**
- * A set of task resource requests. This is used in conjuntion with the 
ResourceProfile to
+ * A set of task resource requests. This is used in conjunction with the 
ResourceProfile to
  * programmatically specify the resources needed for an RDD that will be 
applied at the
  * stage level.
  *
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index a226b65..079cf11 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -385,15 +385,17 @@ private[spark] class DAGScheduler(
   def createShuffleMapStage[K, V, C](
       shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
     val rdd = shuffleDep.rdd
+    val (shuffleDeps, resourceProfiles) = 
getShuffleDependenciesAndResourceProfiles(rdd)
+    val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
     checkBarrierStageWithDynamicAllocation(rdd)
-    checkBarrierStageWithNumSlots(rdd)
+    checkBarrierStageWithNumSlots(rdd, resourceProfile)
     checkBarrierStageWithRDDChainPattern(rdd, rdd.getNumPartitions)
     val numTasks = rdd.partitions.length
-    val parents = getOrCreateParentStages(rdd, jobId)
+    val parents = getOrCreateParentStages(shuffleDeps, jobId)
     val id = nextStageId.getAndIncrement()
     val stage = new ShuffleMapStage(
       id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, 
mapOutputTracker,
-      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+      resourceProfile.id)
 
     stageIdToStage(id) = stage
     shuffleIdToMapStage(shuffleDep.shuffleId) = stage
@@ -433,14 +435,32 @@ private[spark] class DAGScheduler(
    * the check fails consecutively beyond a configured number for a job, then 
fail current job
    * submission.
    */
-  private def checkBarrierStageWithNumSlots(rdd: RDD[_]): Unit = {
+  private def checkBarrierStageWithNumSlots(rdd: RDD[_], rp: ResourceProfile): 
Unit = {
     val numPartitions = rdd.getNumPartitions
-    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks
+    val maxNumConcurrentTasks = sc.maxNumConcurrentTasks(rp)
     if (rdd.isBarrier() && numPartitions > maxNumConcurrentTasks) {
       throw new BarrierJobSlotsNumberCheckFailed(numPartitions, 
maxNumConcurrentTasks)
     }
   }
 
+  private[scheduler] def mergeResourceProfilesForStage(
+      stageResourceProfiles: HashSet[ResourceProfile]): ResourceProfile = {
+    logDebug(s"Merging stage rdd profiles: $stageResourceProfiles")
+    val resourceProfile = if (stageResourceProfiles.size > 1) {
+      // add option later to actually merge profiles - SPARK-29153
+      throw new IllegalArgumentException("Multiple ResourceProfile's specified 
in the RDDs for " +
+        "this stage, please resolve the conflicting ResourceProfile's as Spark 
doesn't" +
+        "currently support merging them.")
+    } else {
+      if (stageResourceProfiles.size == 1) {
+        stageResourceProfiles.head
+      } else {
+        sc.resourceProfileManager.defaultResourceProfile
+      }
+    }
+    resourceProfile
+  }
+
   /**
    * Create a ResultStage associated with the provided jobId.
    */
@@ -450,24 +470,27 @@ private[spark] class DAGScheduler(
       partitions: Array[Int],
       jobId: Int,
       callSite: CallSite): ResultStage = {
+    val (shuffleDeps, resourceProfiles) = 
getShuffleDependenciesAndResourceProfiles(rdd)
+    val resourceProfile = mergeResourceProfilesForStage(resourceProfiles)
     checkBarrierStageWithDynamicAllocation(rdd)
-    checkBarrierStageWithNumSlots(rdd)
+    checkBarrierStageWithNumSlots(rdd, resourceProfile)
     checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
-    val parents = getOrCreateParentStages(rdd, jobId)
+    val parents = getOrCreateParentStages(shuffleDeps, jobId)
     val id = nextStageId.getAndIncrement()
-    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, 
callSite,
-      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
+      callSite, resourceProfile.id)
     stageIdToStage(id) = stage
     updateJobIdStageIdMaps(jobId, stage)
     stage
   }
 
   /**
-   * Get or create the list of parent stages for a given RDD.  The new Stages 
will be created with
-   * the provided firstJobId.
+   * Get or create the list of parent stages for the given shuffle 
dependencies. The new
+   * Stages will be created with the provided firstJobId.
    */
-  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): 
List[Stage] = {
-    getShuffleDependencies(rdd).map { shuffleDep =>
+  private def getOrCreateParentStages(shuffleDeps: 
HashSet[ShuffleDependency[_, _, _]],
+      firstJobId: Int): List[Stage] = {
+    shuffleDeps.map { shuffleDep =>
       getOrCreateShuffleMapStage(shuffleDep, firstJobId)
     }.toList
   }
@@ -485,7 +508,8 @@ private[spark] class DAGScheduler(
       val toVisit = waitingForVisit.remove(0)
       if (!visited(toVisit)) {
         visited += toVisit
-        getShuffleDependencies(toVisit).foreach { shuffleDep =>
+        val (shuffleDeps, _) = 
getShuffleDependenciesAndResourceProfiles(toVisit)
+        shuffleDeps.foreach { shuffleDep =>
           if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
             ancestors.prepend(shuffleDep)
             waitingForVisit.prepend(shuffleDep.rdd)
@@ -497,10 +521,11 @@ private[spark] class DAGScheduler(
   }
 
   /**
-   * Returns shuffle dependencies that are immediate parents of the given RDD.
+   * Returns shuffle dependencies that are immediate parents of the given RDD 
and the
+   * ResourceProfiles associated with the RDDs for this stage.
    *
-   * This function will not return more distant ancestors.  For example, if C 
has a shuffle
-   * dependency on B which has a shuffle dependency on A:
+   * This function will not return more distant ancestors for shuffle 
dependencies. For example,
+   * if C has a shuffle dependency on B which has a shuffle dependency on A:
    *
    * A <-- B <-- C
    *
@@ -508,9 +533,10 @@ private[spark] class DAGScheduler(
    *
    * This function is scheduler-visible for the purpose of unit testing.
    */
-  private[scheduler] def getShuffleDependencies(
-      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
+  private[scheduler] def getShuffleDependenciesAndResourceProfiles(
+      rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], 
HashSet[ResourceProfile]) = {
     val parents = new HashSet[ShuffleDependency[_, _, _]]
+    val resourceProfiles = new HashSet[ResourceProfile]
     val visited = new HashSet[RDD[_]]
     val waitingForVisit = new ListBuffer[RDD[_]]
     waitingForVisit += rdd
@@ -518,6 +544,7 @@ private[spark] class DAGScheduler(
       val toVisit = waitingForVisit.remove(0)
       if (!visited(toVisit)) {
         visited += toVisit
+        Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
         toVisit.dependencies.foreach {
           case shuffleDep: ShuffleDependency[_, _, _] =>
             parents += shuffleDep
@@ -526,7 +553,7 @@ private[spark] class DAGScheduler(
         }
       }
     }
-    parents
+    (parents, resourceProfiles)
   }
 
   /**
@@ -1253,7 +1280,8 @@ private[spark] class DAGScheduler(
       logInfo(s"Submitting ${tasks.size} missing tasks from $stage 
(${stage.rdd}) (first 15 " +
         s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
       taskScheduler.submitTasks(new TaskSet(
-        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, 
properties))
+        tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, 
properties,
+        stage.resourceProfileId))
     } else {
       // Because we posted SparkListenerStageSubmitted earlier, we should mark
       // the stage as completed here in case there are no tasks to run
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala 
b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
index 4752353..a5bba64 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.scheduler
 
+import org.apache.spark.resource.ResourceProfile
+
 /**
  * A backend interface for scheduling systems that allows plugging in 
different ones under
  * TaskSchedulerImpl. We assume a Mesos-like model where the application gets 
resource offers as
@@ -80,12 +82,14 @@ private[spark] trait SchedulerBackend {
   def getDriverAttributes: Option[Map[String, String]] = None
 
   /**
-   * Get the max number of tasks that can be concurrent launched currently.
+   * Get the max number of tasks that can be concurrent launched based on the 
ResourceProfile
+   * being used.
    * Note that please don't cache the value returned by this method, because 
the number can change
    * due to add/remove executors.
    *
+   * @param rp ResourceProfile which to use to calculate max concurrent tasks.
    * @return The max number of tasks that can be concurrent launched currently.
    */
-  def maxNumConcurrentTasks(): Int
+  def maxNumConcurrentTasks(rp: ResourceProfile): Int
 
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 1b197c4..7e2fbb4 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -31,7 +31,7 @@ import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
-import org.apache.spark.resource.ResourceUtils
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.rpc.RpcEndpoint
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.TaskLocality.TaskLocality
@@ -93,9 +93,6 @@ private[spark] class TaskSchedulerImpl(
   // CPUs to request per task
   val CPUS_PER_TASK = conf.get(config.CPUS_PER_TASK)
 
-  // Resources to request per task
-  val resourcesReqsPerTask = ResourceUtils.parseResourceRequirements(sc.conf, 
SPARK_TASK_PREFIX)
-
   // TaskSetManagers are not thread safe, so any access to one should be 
synchronized
   // on this class.  Protected by `this`
   private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, 
TaskSetManager]]
@@ -209,7 +206,8 @@ private[spark] class TaskSchedulerImpl(
 
   override def submitTasks(taskSet: TaskSet): Unit = {
     val tasks = taskSet.tasks
-    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " 
tasks")
+    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " 
tasks "
+      + "resource profile " + taskSet.resourceProfileId)
     this.synchronized {
       val manager = createTaskSetManager(taskSet, maxTaskFailures)
       val stage = taskSet.stageId
@@ -340,39 +338,49 @@ private[spark] class TaskSchedulerImpl(
     for (i <- 0 until shuffledOffers.size) {
       val execId = shuffledOffers(i).executorId
       val host = shuffledOffers(i).host
-      if (availableCpus(i) >= CPUS_PER_TASK &&
-        resourcesMeetTaskRequirements(availableResources(i))) {
-        try {
-          for (task <- taskSet.resourceOffer(execId, host, maxLocality, 
availableResources(i))) {
-            tasks(i) += task
-            val tid = task.taskId
-            taskIdToTaskSetManager.put(tid, taskSet)
-            taskIdToExecutorId(tid) = execId
-            executorIdToRunningTaskIds(execId).add(tid)
-            availableCpus(i) -= CPUS_PER_TASK
-            assert(availableCpus(i) >= 0)
-            task.resources.foreach { case (rName, rInfo) =>
-              // Remove the first n elements from availableResources 
addresses, these removed
-              // addresses are the same as that we allocated in 
taskSet.resourceOffer() since it's
-              // synchronized. We don't remove the exact addresses allocated 
because the current
-              // approach produces the identical result with less time 
complexity.
-              availableResources(i).getOrElse(rName,
-                throw new SparkException(s"Try to acquire resource $rName that 
doesn't exist."))
-                .remove(0, rInfo.addresses.size)
-            }
-            // Only update hosts for a barrier task.
-            if (taskSet.isBarrier) {
-              // The executor address is expected to be non empty.
-              addressesWithDescs += (shuffledOffers(i).address.get -> task)
+      val taskSetRpID = taskSet.taskSet.resourceProfileId
+      // make the resource profile id a hard requirement for now - ie only put 
tasksets
+      // on executors where resource profile exactly matches.
+      if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+        val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, 
availableCpus(i),
+          availableResources(i))
+        taskResAssignmentsOpt.foreach { taskResAssignments =>
+          try {
+            val prof = 
sc.resourceProfileManager.resourceProfileFromId(taskSetRpID)
+            val taskCpus = 
ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
+            val taskDescOption = taskSet.resourceOffer(execId, host, 
maxLocality,
+              taskResAssignments)
+            for (task <- taskDescOption) {
+              tasks(i) += task
+              val tid = task.taskId
+              taskIdToTaskSetManager.put(tid, taskSet)
+              taskIdToExecutorId(tid) = execId
+              executorIdToRunningTaskIds(execId).add(tid)
+              availableCpus(i) -= taskCpus
+              assert(availableCpus(i) >= 0)
+              task.resources.foreach { case (rName, rInfo) =>
+                // Remove the first n elements from availableResources 
addresses, these removed
+                // addresses are the same as that we allocated in 
taskResourceAssignments since it's
+                // synchronized. We don't remove the exact addresses allocated 
because the current
+                // approach produces the identical result with less time 
complexity.
+                availableResources(i).getOrElse(rName,
+                  throw new SparkException(s"Try to acquire resource $rName 
that doesn't exist."))
+                  .remove(0, rInfo.addresses.size)
+              }
+              // Only update hosts for a barrier task.
+              if (taskSet.isBarrier) {
+                // The executor address is expected to be non empty.
+                addressesWithDescs += (shuffledOffers(i).address.get -> task)
+              }
+              launchedTask = true
             }
-            launchedTask = true
+          } catch {
+            case e: TaskNotSerializableException =>
+              logError(s"Resource offer failed, task set ${taskSet.name} was 
not serializable")
+              // Do not offer resources for this task, but don't throw an 
error to allow other
+              // task sets to be submitted.
+              return launchedTask
           }
-        } catch {
-          case e: TaskNotSerializableException =>
-            logError(s"Resource offer failed, task set ${taskSet.name} was not 
serializable")
-            // Do not offer resources for this task, but don't throw an error 
to allow other
-            // task sets to be submitted.
-            return launchedTask
         }
       }
     }
@@ -381,12 +389,81 @@ private[spark] class TaskSchedulerImpl(
 
   /**
    * Check whether the resources from the WorkerOffer are enough to run at 
least one task.
+   * Returns None if the resources don't meet the task requirements, otherwise 
returns
+   * the task resource assignments to give to the next task. Note that the 
assignments maybe
+   * be empty if no custom resources are used.
    */
-  private def resourcesMeetTaskRequirements(resources: Map[String, 
Buffer[String]]): Boolean = {
-    val resourcesFree = resources.map(r => r._1 -> r._2.length)
-    val meetsReqs = ResourceUtils.resourcesMeetRequirements(resourcesFree, 
resourcesReqsPerTask)
-    logDebug(s"Resources meet task requirements is: $meetsReqs")
-    meetsReqs
+  private def resourcesMeetTaskRequirements(
+      taskSet: TaskSetManager,
+      availCpus: Int,
+      availWorkerResources: Map[String, Buffer[String]]
+      ): Option[Map[String, ResourceInformation]] = {
+    val rpId = taskSet.taskSet.resourceProfileId
+    val taskSetProf = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(taskSetProf, 
conf)
+    // check if the ResourceProfile has cpus first since that is common case
+    if (availCpus < taskCpus) return None
+    // only look at the resource other then cpus
+    val tsResources = ResourceProfile.getCustomTaskResources(taskSetProf)
+    if (tsResources.isEmpty) return Some(Map.empty)
+    val localTaskReqAssign = HashMap[String, ResourceInformation]()
+    // we go through all resources here so that we can make sure they match 
and also get what the
+    // assignments are for the next task
+    for ((rName, taskReqs) <- tsResources) {
+      val taskAmount = taskSetProf.getSchedulerTaskResourceAmount(rName)
+      availWorkerResources.get(rName) match {
+        case Some(workerRes) =>
+          if (workerRes.size >= taskAmount) {
+            localTaskReqAssign.put(rName, new ResourceInformation(rName,
+              workerRes.take(taskAmount).toArray))
+          } else {
+            return None
+          }
+        case None => return None
+      }
+    }
+    Some(localTaskReqAssign.toMap)
+  }
+
+  // Use the resource that the resourceProfile has as the limiting resource to 
calculate the
+  // total number of slots available based on the current offers.
+  private def calculateAvailableSlots(
+      resourceProfileIds: Array[Int],
+      availableCpus: Array[Int],
+      availableResources: Array[Map[String, Buffer[String]]],
+      rpId: Int): Int = {
+    val resourceProfile = sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val offersForResourceProfile = resourceProfileIds.zipWithIndex.filter { 
case (id, _) =>
+      (id == resourceProfile.id)
+    }
+    val coresKnown = resourceProfile.isCoresLimitKnown
+    var limitingResource = resourceProfile.limitingResource(conf)
+    val taskCpus = 
ResourceProfile.getTaskCpusOrDefaultForProfile(resourceProfile, conf)
+
+    offersForResourceProfile.map { case (o, index) =>
+      val numTasksPerExecCores = availableCpus(index) / taskCpus
+      // if limiting resource is empty then we have no other resources, so it 
has to be CPU
+      if (limitingResource == ResourceProfile.CPUS || 
limitingResource.isEmpty) {
+        numTasksPerExecCores
+      } else {
+        val taskLimit = 
resourceProfile.taskResources.get(limitingResource).map(_.amount)
+          .getOrElse(throw new SparkException("limitingResource returns from 
ResourceProfile" +
+            s" $resourceProfile doesn't actually contain that task resource!")
+          )
+        // available addresses already takes into account if there are 
fractional
+        // task resource requests
+        val availAddrs = 
availableResources(index).get(limitingResource).map(_.size).getOrElse(0)
+        val resourceLimit = (availAddrs / taskLimit).toInt
+        if (!coresKnown) {
+          // when executor cores config isn't set, we can't calculate the real 
limiting resource
+          // and number of tasks per executor ahead of time, so calculate it 
now based on what
+          // is available.
+          if (numTasksPerExecCores <= resourceLimit) numTasksPerExecCores else 
resourceLimit
+        } else {
+          resourceLimit
+        }
+      }
+    }.sum
   }
 
   /**
@@ -429,9 +506,12 @@ private[spark] class TaskSchedulerImpl(
 
     val shuffledOffers = shuffleOffers(filteredOffers)
     // Build a list of tasks to assign to each worker.
+    // Note the size estimate here might be off with different 
ResourceProfiles but should be
+    // close estimate
     val tasks = shuffledOffers.map(o => new 
ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
     val availableResources = shuffledOffers.map(_.resources).toArray
     val availableCpus = shuffledOffers.map(o => o.cores).toArray
+    val resourceProfileIds = shuffledOffers.map(o => 
o.resourceProfileId).toArray
     val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
     for (taskSet <- sortedTaskSets) {
       logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -441,19 +521,27 @@ private[spark] class TaskSchedulerImpl(
       }
     }
 
-    // Take each TaskSet in our scheduling order, and then offer it each node 
in increasing order
+    // Take each TaskSet in our scheduling order, and then offer it to each 
node in increasing order
     // of locality levels so that it gets a chance to launch local tasks on 
all of them.
     // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, 
RACK_LOCAL, ANY
     for (taskSet <- sortedTaskSets) {
-      val availableSlots = availableCpus.map(c => c / CPUS_PER_TASK).sum
+      // we only need to calculate available slots if using barrier 
scheduling, otherwise the
+      // value is -1
+      val numBarrierSlotsAvailable = if (taskSet.isBarrier) {
+        val slots = calculateAvailableSlots(resourceProfileIds, availableCpus, 
availableResources,
+          taskSet.taskSet.resourceProfileId)
+        slots
+      } else {
+        -1
+      }
       // Skip the barrier taskSet if the available slots are less than the 
number of pending tasks.
-      if (taskSet.isBarrier && availableSlots < taskSet.numTasks) {
+      if (taskSet.isBarrier && numBarrierSlotsAvailable < taskSet.numTasks) {
         // Skip the launch process.
         // TODO SPARK-24819 If the job requires more slots than available 
(both busy and free
         // slots), fail the job on submit.
         logInfo(s"Skip current round of resource offers for barrier stage 
${taskSet.stageId} " +
           s"because the barrier taskSet requires ${taskSet.numTasks} slots, 
while the total " +
-          s"number of available slots is $availableSlots.")
+          s"number of available slots is $numBarrierSlotsAvailable.")
       } else {
         var launchedAnyTask = false
         // Record all the executor IDs assigned barrier tasks on.
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
index 517c899..7a8ed16 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSet.scala
@@ -28,7 +28,8 @@ private[spark] class TaskSet(
     val stageId: Int,
     val stageAttemptId: Int,
     val priority: Int,
-    val properties: Properties) {
+    val properties: Properties,
+    val resourceProfileId: Int) {
   val id: String = stageId + "." + stageAttemptId
 
   override def toString: String = "TaskSet " + id
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 18684ee..2c79233 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -90,10 +90,18 @@ private[spark] class TaskSetManager(
   // SPARK-30417: #cores per executor might not be set in spark conf for 
standalone mode, then
   // the value of the conf would 1 by default. However, the executor would use 
all the cores on
   // the worker. Therefore, CPUS_PER_TASK is okay to be greater than 1 without 
setting #cores.
-  // To handle this case, we assume the minimum number of slots is 1.
+  // To handle this case, we set slots to 1 when we don't know the executor 
cores.
   // TODO: use the actual number of slots for standalone mode.
-  val speculationTasksLessEqToSlots =
-    numTasks <= Math.max(conf.get(EXECUTOR_CORES) / sched.CPUS_PER_TASK, 1)
+  val speculationTasksLessEqToSlots = {
+    val rpId = taskSet.resourceProfileId
+    val resourceProfile = 
sched.sc.resourceProfileManager.resourceProfileFromId(rpId)
+    val slots = if (!resourceProfile.isCoresLimitKnown) {
+      1
+    } else {
+      resourceProfile.maxTasksPerExecutor(conf)
+    }
+    numTasks <= slots
+  }
 
   // For each task, tracks whether a copy of the task has succeeded. A task 
will also be
   // marked as "succeeded" if it failed with a fetch failure, in which case it 
should not
@@ -394,7 +402,7 @@ private[spark] class TaskSetManager(
       execId: String,
       host: String,
       maxLocality: TaskLocality.TaskLocality,
-      availableResources: Map[String, Seq[String]] = Map.empty)
+      taskResourceAssignments: Map[String, ResourceInformation] = Map.empty)
     : Option[TaskDescription] =
   {
     val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
@@ -457,18 +465,8 @@ private[spark] class TaskSetManager(
         // val timeTaken = clock.getTime() - startTime
         val taskName = s"task ${info.id} in stage ${taskSet.id}"
         logInfo(s"Starting $taskName (TID $taskId, $host, executor 
${info.executorId}, " +
-          s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit()} bytes)")
-
-        val extraResources = sched.resourcesReqsPerTask.map { taskReq =>
-          val rName = taskReq.resourceName
-          val count = taskReq.amount
-          val rAddresses = availableResources.getOrElse(rName, Seq.empty)
-          assert(rAddresses.size >= count, s"Required $count $rName addresses, 
but only " +
-            s"${rAddresses.size} available.")
-          // We'll drop the allocated addresses later inside TaskSchedulerImpl.
-          val allocatedAddresses = rAddresses.take(count)
-          (rName, new ResourceInformation(rName, allocatedAddresses.toArray))
-        }.toMap
+          s"partition ${task.partitionId}, $taskLocality, 
${serializedTask.limit()} bytes) " +
+          s"taskResourceAssignments ${taskResourceAssignments}")
 
         sched.dagScheduler.taskStarted(task, info)
         new TaskDescription(
@@ -481,7 +479,7 @@ private[spark] class TaskSetManager(
           addedFiles,
           addedJars,
           task.localProperties,
-          extraResources,
+          taskResourceAssignments,
           serializedTask)
       }
     } else {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala 
b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
index 522dbfa..92a12f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/WorkerOffer.scala
@@ -19,6 +19,8 @@ package org.apache.spark.scheduler
 
 import scala.collection.mutable.Buffer
 
+import org.apache.spark.resource.ResourceProfile
+
 /**
  * Represents free resources available on an executor.
  */
@@ -30,4 +32,5 @@ case class WorkerOffer(
     // `address` is an optional hostPort string, it provide more useful 
information than `host`
     // when multiple executors are launched on the same host.
     address: Option[String] = None,
-    resources: Map[String, Buffer[String]] = Map.empty)
+    resources: Map[String, Buffer[String]] = Map.empty,
+    resourceProfileId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 6e1efda..cca8e86 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -145,7 +145,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         if (TaskState.isFinished(state)) {
           executorDataMap.get(executorId) match {
             case Some(executorInfo) =>
-              executorInfo.freeCores += scheduler.CPUS_PER_TASK
+              val rpId = executorInfo.resourceProfileId
+              val prof = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+              val taskCpus = 
ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
+              executorInfo.freeCores += taskCpus
               resources.foreach { case (k, v) =>
                 executorInfo.resourcesInfo.get(k).foreach { r =>
                   r.release(v.addresses)
@@ -231,7 +234,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           totalCoreCount.addAndGet(cores)
           totalRegisteredExecutors.addAndGet(1)
           val resourcesInfo = resources.map { case (rName, info) =>
-            // tell the executor it can schedule resources up to numParts 
times,
+            // tell the executor it can schedule resources up to 
numSlotsPerAddress times,
             // as configured by the user, or set to 1 as that is the default 
(1 task/resource)
             val numParts = scheduler.sc.resourceProfileManager
               
.resourceProfileFromId(resourceProfileId).getNumSlotsPerAddress(rName, conf)
@@ -298,7 +301,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
               Some(executorData.executorAddress.hostPort),
               executorData.resourcesInfo.map { case (rName, rInfo) =>
                 (rName, rInfo.availableAddrs.toBuffer)
-              })
+              }, executorData.resourceProfileId)
         }.toIndexedSeq
         scheduler.resourceOffers(workOffers)
       }
@@ -327,7 +330,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
               Some(executorData.executorAddress.hostPort),
               executorData.resourcesInfo.map { case (rName, rInfo) =>
                 (rName, rInfo.availableAddrs.toBuffer)
-              }))
+              }, executorData.resourceProfileId))
           scheduler.resourceOffers(workOffers)
         } else {
           Seq.empty
@@ -359,7 +362,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           val executorData = executorDataMap(task.executorId)
           // Do resources allocation here. The allocated resources will get 
released after the task
           // finishes.
-          executorData.freeCores -= scheduler.CPUS_PER_TASK
+          val rpId = executorData.resourceProfileId
+          val prof = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+          val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, 
conf)
+          executorData.freeCores -= taskCpus
           task.resources.foreach { case (rName, rInfo) =>
             assert(executorData.resourcesInfo.contains(rName))
             executorData.resourcesInfo(rName).acquire(rInfo.addresses)
@@ -606,10 +612,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   }
 
-  override def maxNumConcurrentTasks(): Int = synchronized {
-    executorDataMap.values.map { executor =>
-      executor.totalCores / scheduler.CPUS_PER_TASK
-    }.sum
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = synchronized {
+    val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
+    val executorsWithResourceProfile = 
executorDataMap.values.filter(_.resourceProfileId == rp.id)
+    executorsWithResourceProfile.map(_.totalCores / cpusPerTask).sum
   }
 
   // this function is for testing only
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
index 42a5afe..e2b1198 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
@@ -26,7 +26,7 @@ import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
-import org.apache.spark.resource.ResourceInformation
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, 
ThreadSafeRpcEndpoint}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -162,7 +162,12 @@ private[spark] class LocalSchedulerBackend(
 
   override def applicationId(): String = appId
 
-  override def maxNumConcurrentTasks(): Int = totalCores / 
scheduler.CPUS_PER_TASK
+  // Doesn't support different ResourceProfiles yet
+  // so we expect all executors to be of same ResourceProfile
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
+    val cpusPerTask = ResourceProfile.getTaskCpusOrDefaultForProfile(rp, conf)
+    totalCores / cpusPerTask
+  }
 
   private def stop(finalState: SparkAppHandle.State): Unit = {
     localEndpoint.ask(StopExecutor)
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index f8b9930..0b2a58d 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -511,7 +511,6 @@ class StandaloneDynamicAllocationSuite
     val taskScheduler = mock(classOf[TaskSchedulerImpl])
     when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
     when(taskScheduler.resourceOffers(any())).thenReturn(Nil)
-    when(taskScheduler.resourcesReqsPerTask).thenReturn(Seq.empty)
     when(taskScheduler.sc).thenReturn(sc)
 
     val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, 
securityManager)
diff --git 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
index 3134a73..e0b5860 100644
--- 
a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala
@@ -301,8 +301,8 @@ class CoarseGrainedExecutorBackendSuite extends 
SparkFunSuite
       val taskId = 1000000
       // We don't really verify the data, just pass it around.
       val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
-      val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000",
-        19, 1, mutable.Map.empty, mutable.Map.empty, new Properties,
+      val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 
1000000", 19,
+        1, mutable.Map.empty, mutable.Map.empty, new Properties,
         Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))), data)
       val serializedTaskDescription = TaskDescription.encode(taskDescription)
       backend.executor = mock[Executor]
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index 7666c6c..f4745db 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -72,7 +72,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
       // Ensure all executors have been launched.
       assert(sc.getExecutorIds().length == 4)
     }
-    assert(sc.maxNumConcurrentTasks() == 12)
+    
assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf))
 == 12)
   }
 
   test("compute max number of concurrent tasks can be launched when 
spark.task.cpus > 1") {
@@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
       assert(sc.getExecutorIds().length == 4)
     }
     // Each executor can only launch one task since `spark.task.cpus` is 2.
-    assert(sc.maxNumConcurrentTasks() == 4)
+    
assert(sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf))
 == 4)
   }
 
   test("compute max number of concurrent tasks can be launched when some 
executors are busy") {
@@ -126,7 +126,8 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
         assert(taskStarted.get())
         assert(taskEnded.get() == false)
         // Assert we count in slots on both busy and free executors.
-        assert(sc.maxNumConcurrentTasks() == 4)
+        assert(
+          
sc.maxNumConcurrentTasks(ResourceProfile.getOrCreateDefaultProfile(conf)) == 4)
       }
     } finally {
       sc.removeSparkListener(listener)
@@ -187,8 +188,11 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
   }
 
   test("extra resources from executor") {
+    import TestUtils._
+
+    val execCores = 3
     val conf = new SparkConf()
-      .set(EXECUTOR_CORES, 1)
+      .set(EXECUTOR_CORES, execCores)
       .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive 
during test
       .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor 
registrations
       .setMaster(
@@ -294,7 +298,6 @@ private class CSMockExternalClusterManager extends 
ExternalClusterManager {
     when(ts.applicationAttemptId()).thenReturn(Some("attempt1"))
     when(ts.schedulingMode).thenReturn(SchedulingMode.FIFO)
     when(ts.nodeBlacklist()).thenReturn(Set.empty[String])
-    when(ts.resourcesReqsPerTask).thenReturn(Seq.empty)
     ts
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 4486389..33a14ce 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.broadcast.BroadcastManager
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.rdd.{DeterministicLevel, RDD}
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
ResourceProfileBuilder, TaskResourceRequests}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.shuffle.{FetchFailedException, 
MetadataFetchFailedException}
 import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
@@ -2547,9 +2548,9 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
 
   /**
    * Checks the DAGScheduler's internal logic for traversing an RDD DAG by 
making sure that
-   * getShuffleDependencies correctly returns the direct shuffle dependencies 
of a particular
-   * RDD. The test creates the following RDD graph (where n denotes a narrow 
dependency and s
-   * denotes a shuffle dependency):
+   * getShuffleDependenciesAndResourceProfiles correctly returns the direct 
shuffle dependencies
+   * of a particular RDD. The test creates the following RDD graph (where n 
denotes a narrow
+   * dependency and s denotes a shuffle dependency):
    *
    * A <------------s---------,
    *                           \
@@ -2558,7 +2559,7 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
    * Here, the direct shuffle dependency of C is just the shuffle dependency 
on B. The direct
    * shuffle dependencies of E are the shuffle dependency on A and the shuffle 
dependency on C.
    */
-  test("getShuffleDependencies correctly returns only direct shuffle parents") 
{
+  test("getShuffleDependenciesAndResourceProfiles correctly returns only 
direct shuffle parents") {
     val rddA = new MyRDD(sc, 2, Nil)
     val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
     val rddB = new MyRDD(sc, 2, Nil)
@@ -2569,11 +2570,16 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     val narrowDepD = new OneToOneDependency(rddD)
     val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = 
mapOutputTracker)
 
-    assert(scheduler.getShuffleDependencies(rddA) === Set())
-    assert(scheduler.getShuffleDependencies(rddB) === Set())
-    assert(scheduler.getShuffleDependencies(rddC) === Set(shuffleDepB))
-    assert(scheduler.getShuffleDependencies(rddD) === Set(shuffleDepC))
-    assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, 
shuffleDepC))
+    val (shuffleDepsA, _) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddA)
+    assert(shuffleDepsA === Set())
+    val (shuffleDepsB, _) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddB)
+    assert(shuffleDepsB === Set())
+    val (shuffleDepsC, _) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddC)
+    assert(shuffleDepsC === Set(shuffleDepB))
+    val (shuffleDepsD, _) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddD)
+    assert(shuffleDepsD === Set(shuffleDepC))
+    val (shuffleDepsE, _) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddE)
+    assert(shuffleDepsE === Set(shuffleDepA, shuffleDepC))
   }
 
   test("SPARK-17644: After one stage is aborted for too many failed attempts, 
subsequent stages" +
@@ -3141,6 +3147,97 @@ class DAGSchedulerSuite extends SparkFunSuite with 
LocalSparkContext with TimeLi
     assertDataStructuresEmpty()
   }
 
+  test("test default resource profile") {
+    val rdd = sc.parallelize(1 to 10).map(x => (x, x))
+    val (shuffledeps, resourceprofiles) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val rp = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    assert(rp.id == 
scheduler.sc.resourceProfileManager.defaultResourceProfile.id)
+  }
+
+  test("test 1 resource profile") {
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val rdd = sc.parallelize(1 to 10).map(x => (x, x)).withResources(rp1)
+    val (shuffledeps, resourceprofiles) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+    val rpMerged = scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    val expectedid = Option(rdd.getResourceProfile).map(_.id)
+    assert(expectedid.isDefined)
+    assert(expectedid.get != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    assert(rpMerged.id == expectedid.get)
+  }
+
+  test("test 2 resource profiles errors by default") {
+    import org.apache.spark.resource._
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+
+    val ereqs2 = new ExecutorResourceRequests().cores(2)
+    val treqs2 = new TaskResourceRequests().cpus(2)
+    val rp2 = new 
ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+    val rdd = sc.parallelize(1 to 10).withResources(rp1).map(x => (x, 
x)).withResources(rp2)
+    val error = intercept[IllegalArgumentException] {
+      val (shuffledeps, resourceprofiles) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rdd)
+      scheduler.mergeResourceProfilesForStage(resourceprofiles)
+    }.getMessage()
+
+    assert(error.contains("Multiple ResourceProfile's specified in the RDDs"))
+  }
+
+  /**
+   * Checks the DAGScheduler's internal logic for traversing an RDD DAG by 
making sure that
+   * getShuffleDependenciesAndResourceProfiles correctly returns the direct 
shuffle dependencies
+   * of a particular RDD. The test creates the following RDD graph (where n 
denotes a narrow
+   * dependency and s denotes a shuffle dependency):
+   *
+   * A <------------s---------,
+   *                           \
+   * B <--s-- C <--s-- D <--n------ E
+   *
+   * Here, the direct shuffle dependency of C is just the shuffle dependency 
on B. The direct
+   * shuffle dependencies of E are the shuffle dependency on A and the shuffle 
dependency on C.
+   */
+  test("getShuffleDependenciesAndResourceProfiles returns deps and profiles 
correctly") {
+    import org.apache.spark.resource._
+    val ereqs = new ExecutorResourceRequests().cores(4)
+    val treqs = new TaskResourceRequests().cpus(1)
+    val rp1 = new ResourceProfileBuilder().require(ereqs).require(treqs).build
+    val ereqs2 = new ExecutorResourceRequests().cores(6)
+    val treqs2 = new TaskResourceRequests().cpus(2)
+    val rp2 = new 
ResourceProfileBuilder().require(ereqs2).require(treqs2).build
+
+    val rddWithRp = new MyRDD(sc, 2, Nil).withResources(rp1)
+    val rddA = new MyRDD(sc, 2, Nil).withResources(rp1)
+    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(1))
+    val rddB = new MyRDD(sc, 2, Nil)
+    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(1))
+    val rddWithRpDep = new OneToOneDependency(rddWithRp)
+    val rddC = new MyRDD(sc, 1, List(rddWithRpDep, 
shuffleDepB)).withResources(rp2)
+    val shuffleDepC = new ShuffleDependency(rddC, new HashPartitioner(1))
+    val rddD = new MyRDD(sc, 1, List(shuffleDepC))
+    val narrowDepD = new OneToOneDependency(rddD)
+    val rddE = new MyRDD(sc, 1, List(shuffleDepA, narrowDepD), tracker = 
mapOutputTracker)
+
+    val (shuffleDepsA, rprofsA) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddA)
+    assert(shuffleDepsA === Set())
+    assert(rprofsA === Set(rp1))
+    val (shuffleDepsB, rprofsB) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddB)
+    assert(shuffleDepsB === Set())
+    assert(rprofsB === Set())
+    val (shuffleDepsC, rprofsC) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddC)
+    assert(shuffleDepsC === Set(shuffleDepB))
+    assert(rprofsC === Set(rp1, rp2))
+    val (shuffleDepsD, rprofsD) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddD)
+    assert(shuffleDepsD === Set(shuffleDepC))
+    assert(rprofsD === Set())
+    val (shuffleDepsE, rprofsE) = 
scheduler.getShuffleDependenciesAndResourceProfiles(rddE)
+    assert(shuffleDepsE === Set(shuffleDepA, shuffleDepC))
+    assert(rprofsE === Set())
+  }
+
   /**
    * Assert that the supplied TaskSet has exactly the given hosts as its 
preferred locations.
    * Note that this checks only the host and not the executor ID.
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
index 9f593e0..7ead51b 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.Map
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
 import org.apache.spark.executor.ExecutorMetrics
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.AccumulatorV2
@@ -71,7 +72,7 @@ private class DummySchedulerBackend extends SchedulerBackend {
   def stop(): Unit = {}
   def reviveOffers(): Unit = {}
   def defaultParallelism(): Int = 1
-  def maxNumConcurrentTasks(): Int = 0
+  def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0
 }
 
 private class DummyTaskScheduler extends TaskScheduler {
diff --git a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala 
b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
index 8cb6268..9ec088a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala
@@ -21,6 +21,7 @@ import java.util.Properties
 
 import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.executor.TaskMetrics
+import org.apache.spark.resource.ResourceProfile
 
 class FakeTask(
     stageId: Int,
@@ -42,7 +43,12 @@ object FakeTask {
    * locations for each task (given as varargs) if this sequence is not empty.
    */
   def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
-    createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, 
prefLocs: _*)
+    createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*)
+  }
+
+  def createTaskSet(numTasks: Int, rpId: Int, prefLocs: Seq[TaskLocation]*): 
TaskSet = {
+    createTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 0, 
rpId, prefLocs: _*)
   }
 
   def createTaskSet(
@@ -50,7 +56,8 @@ object FakeTask {
       stageId: Int,
       stageAttemptId: Int,
       prefLocs: Seq[TaskLocation]*): TaskSet = {
-    createTaskSet(numTasks, stageId, stageAttemptId, priority = 0, prefLocs: 
_*)
+    createTaskSet(numTasks, stageId, stageAttemptId, priority = 0,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*)
   }
 
   def createTaskSet(
@@ -58,6 +65,7 @@ object FakeTask {
       stageId: Int,
       stageAttemptId: Int,
       priority: Int,
+      rpId: Int,
       prefLocs: Seq[TaskLocation]*): TaskSet = {
     if (prefLocs.size != 0 && prefLocs.size != numTasks) {
       throw new IllegalArgumentException("Wrong number of task locations")
@@ -65,7 +73,7 @@ object FakeTask {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
       new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil)
     }
-    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
+    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, 
rpId)
   }
 
   def createShuffleMapTaskSet(
@@ -91,11 +99,21 @@ object FakeTask {
       }, prefLocs(i), new Properties,
         
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array())
     }
-    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
+    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
   }
 
   def createBarrierTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): 
TaskSet = {
-    createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 
0, prefLocs: _*)
+    createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 
0,
+      rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID, prefLocs: _*)
+  }
+
+  def createBarrierTaskSet(
+      numTasks: Int,
+      rpId: Int,
+      prefLocs: Seq[TaskLocation]*): TaskSet = {
+    createBarrierTaskSet(numTasks, stageId = 0, stageAttemptId = 0, priority = 
0,
+      rpId = rpId, prefLocs: _*)
   }
 
   def createBarrierTaskSet(
@@ -103,6 +121,7 @@ object FakeTask {
       stageId: Int,
       stageAttemptId: Int,
       priority: Int,
+      rpId: Int,
       prefLocs: Seq[TaskLocation]*): TaskSet = {
     if (prefLocs.size != 0 && prefLocs.size != numTasks) {
       throw new IllegalArgumentException("Wrong number of task locations")
@@ -110,6 +129,6 @@ object FakeTask {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
       new FakeTask(stageId, i, if (prefLocs.size != 0) prefLocs(i) else Nil, 
isBarrier = true)
     }
-    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null)
+    new TaskSet(tasks, stageId, stageAttemptId, priority = priority, null, 
rpId)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index b953add..d9de976 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -22,6 +22,7 @@ import java.util.Properties
 
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
 import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.SchedulingMode._
 
 /**
@@ -39,7 +40,8 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     val tasks = Array.tabulate[Task[_]](numTasks) { i =>
       new FakeTask(stageId, i, Nil)
     }
-    new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null), 
0)
+    new TaskSetManager(taskScheduler, new TaskSet(tasks, stageId, 0, 0, null,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID), 0)
   }
 
   def scheduleTaskAndVerifyId(taskId: Int, rootPool: Pool, expectedStageId: 
Int): Unit = {
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
index dff8975..0874163 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala
@@ -36,6 +36,7 @@ import org.apache.spark.TaskState._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.SCHEDULER_REVIVE_INTERVAL
 import org.apache.spark.rdd.RDD
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.util.{CallSite, ThreadUtils, Utils}
 
 /**
@@ -385,7 +386,7 @@ private[spark] abstract class MockBackend(
     }.toIndexedSeq
   }
 
-  override def maxNumConcurrentTasks(): Int = 0
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0
 
   /**
    * This is called by the scheduler whenever it has tasks it would like to 
schedule, when a tasks
@@ -406,9 +407,9 @@ private[spark] abstract class MockBackend(
         (taskDescription, task)
       }
       newTasks.foreach { case (taskDescription, _) =>
+        freeCores -= taskScheduler.CPUS_PER_TASK
         executorIdToExecutor(taskDescription.executorId).freeCores -= 
taskScheduler.CPUS_PER_TASK
       }
-      freeCores -= newTasks.size * taskScheduler.CPUS_PER_TASK
       assignedTasksWaitingToRun ++= newTasks
     }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index e7ecf84..9ee84a8 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -31,6 +31,7 @@ import org.scalatestplus.mockito.MockitoSugar
 import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.util.ManualClock
@@ -40,7 +41,7 @@ class FakeSchedulerBackend extends SchedulerBackend {
   def stop(): Unit = {}
   def reviveOffers(): Unit = {}
   def defaultParallelism(): Int = 1
-  def maxNumConcurrentTasks(): Int = 0
+  def maxNumConcurrentTasks(rp: ResourceProfile): Int = 0
 }
 
 class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with 
BeforeAndAfterEach
@@ -202,7 +203,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       config.CPUS_PER_TASK.key -> taskCpus.toString)
     val numFreeCores = 1
     val taskSet = new TaskSet(
-      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 0, 0, 0, null)
+      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)),
+      0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val multiCoreWorkerOffers = IndexedSeq(new WorkerOffer("executor0", 
"host0", taskCpus),
       new WorkerOffer("executor1", "host1", numFreeCores))
     taskScheduler.submitTasks(taskSet)
@@ -216,7 +218,8 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     // still be processed without error
     taskScheduler.submitTasks(FakeTask.createTaskSet(1))
     val taskSet2 = new TaskSet(
-      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 1, 0, 0, null)
+      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)),
+      1, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     taskScheduler.submitTasks(taskSet2)
     taskDescriptions = 
taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
     assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
@@ -1135,6 +1138,96 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(0 === taskDescriptions.length)
   }
 
+  test("don't schedule for a barrier taskSet if available slots are less than 
" +
+    "pending tasks gpus limiting") {
+    val taskCpus = 1
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString,
+      "spark.executor.resource.gpu.amount" -> "1", 
"spark.task.resource.gpu.amount" -> "1")
+
+    val numFreeCores = 3
+    val workerOffers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625"),
+        Map("gpu" -> Seq("0").toBuffer)),
+      new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627"),
+        Map("gpu" -> Seq("0").toBuffer)))
+    val attempt1 = FakeTask.createBarrierTaskSet(3)
+
+    taskScheduler.submitTasks(attempt1)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(0 === taskDescriptions.length)
+  }
+
+  test("schedule tasks for a barrier taskSet if all tasks can be launched 
together gpus") {
+    val taskCpus = 1
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString,
+      "spark.executor.resource.gpu.amount" -> "1", 
"spark.task.resource.gpu.amount" -> "1")
+
+    val numFreeCores = 3
+    val workerOffers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625"),
+        Map("gpu" -> Seq("0").toBuffer)),
+      new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627"),
+        Map("gpu" -> Seq("0").toBuffer)),
+      new WorkerOffer("executor2", "host2", numFreeCores, 
Some("192.168.0.101:49629"),
+        Map("gpu" -> Seq("0").toBuffer)))
+    val attempt1 = FakeTask.createBarrierTaskSet(3)
+
+    taskScheduler.submitTasks(attempt1)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(3 === taskDescriptions.length)
+  }
+
+  // barrier scheduling doesn't yet work with dynamic allocation but test it 
with another
+  // ResourceProfile anyway to make sure code path works when it is supported
+  test("schedule tasks for a barrier taskSet if all tasks can be launched 
together " +
+    "diff ResourceProfile") {
+    val taskCpus = 1
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString)
+    val execReqs = new ExecutorResourceRequests().cores(2).resource("gpu", 2)
+    val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+    val rp = new ResourceProfile(execReqs.requests, taskReqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val numFreeCores = 2
+    val workerOffers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625"),
+        Map("gpu" -> Seq("0", "1").toBuffer), rp.id),
+      new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627"),
+        Map("gpu" -> Seq("0", "1").toBuffer), rp.id))
+    val attempt1 = FakeTask.createBarrierTaskSet(3, rpId = rp.id)
+
+    taskScheduler.submitTasks(attempt1)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(3 === taskDescriptions.length)
+  }
+
+  test("schedule tasks for a barrier taskSet if all tasks can be launched 
together " +
+    "diff ResourceProfile, but not enough gpus") {
+    val taskCpus = 1
+    val taskScheduler = setupSchedulerWithMaster(
+      s"local[$taskCpus]", config.CPUS_PER_TASK.key -> taskCpus.toString)
+    val execReqs = new ExecutorResourceRequests().cores(2).resource("gpu", 2)
+    val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
+    val rp = new ResourceProfile(execReqs.requests, taskReqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val numFreeCores = 2
+    // make each of the worker offers only have 1 GPU, thus making it not 
enough
+    val workerOffers = IndexedSeq(
+      new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625"),
+        Map("gpu" -> Seq("0").toBuffer), rp.id),
+      new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627"),
+        Map("gpu" -> Seq("0").toBuffer), rp.id))
+    val attempt1 = FakeTask.createBarrierTaskSet(3, rpId = rp.id)
+
+    taskScheduler.submitTasks(attempt1)
+    val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(0 === taskDescriptions.length)
+  }
+
   test("schedule tasks for a barrier taskSet if all tasks can be launched 
together") {
     val taskCpus = 2
     val taskScheduler = setupSchedulerWithMaster(
@@ -1165,8 +1258,10 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
       new WorkerOffer("executor0", "host0", numFreeCores, 
Some("192.168.0.101:49625")),
       new WorkerOffer("executor1", "host1", numFreeCores, 
Some("192.168.0.101:49627")),
       new WorkerOffer("executor2", "host2", numFreeCores, 
Some("192.168.0.101:49629")))
-    val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId 
= 0, priority = 1)
-    val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0, 
priority = 0)
+    val barrier = FakeTask.createBarrierTaskSet(3, stageId = 0, stageAttemptId 
= 0, priority = 1,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+    val highPrio = FakeTask.createTaskSet(1, stageId = 1, stageAttemptId = 0, 
priority = 0,
+      rpId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
 
     // submit highPrio and barrier taskSet
     taskScheduler.submitTasks(highPrio)
@@ -1289,6 +1384,93 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext with B
     assert(ArrayBuffer("1") === 
taskDescriptions(1).resources.get(GPU).get.addresses)
   }
 
+  test("Scheduler correctly accounts for GPUs per task with fractional 
amount") {
+    val taskCpus = 1
+    val taskGpus = 0.33
+    val executorGpus = 1
+    val executorCpus = 4
+
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString)
+    val taskSet = FakeTask.createTaskSet(5)
+
+    val numFreeCores = 4
+    val resources = Map(GPU -> ArrayBuffer("0", "0", "0"))
+    val singleCoreWorkerOffers =
+      IndexedSeq(new WorkerOffer("executor0", "host0", numFreeCores, None, 
resources))
+
+    taskScheduler.submitTasks(taskSet)
+    // Launch tasks on executor that satisfies resource requirements.
+    var taskDescriptions = 
taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
+    assert(3 === taskDescriptions.length)
+    assert(!failedTaskSet)
+    assert(ArrayBuffer("0") === 
taskDescriptions(0).resources.get(GPU).get.addresses)
+    assert(ArrayBuffer("0") === 
taskDescriptions(1).resources.get(GPU).get.addresses)
+    assert(ArrayBuffer("0") === 
taskDescriptions(2).resources.get(GPU).get.addresses)
+  }
+
+  test("Scheduler works with multiple ResourceProfiles and gpus") {
+    val taskCpus = 1
+    val taskGpus = 1
+    val executorGpus = 4
+    val executorCpus = 4
+
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString)
+
+    val ereqs = new ExecutorResourceRequests().cores(6).resource(GPU, 6)
+    val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2)
+    val rp = new ResourceProfile(ereqs.requests, treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+    val taskSet = FakeTask.createTaskSet(3)
+    val rpTaskSet = FakeTask.createTaskSet(5, stageId = 1, stageAttemptId = 0,
+      priority = 0, rpId = rp.id)
+
+    val resourcesDefaultProf = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
+    val resources = Map(GPU -> ArrayBuffer("4", "5", "6", "7", "8", "9"))
+
+    val workerOffers =
+      IndexedSeq(new WorkerOffer("executor0", "host0", 2, None, 
resourcesDefaultProf),
+      new WorkerOffer("executor1", "host1", 6, None, resources, rp.id))
+    taskScheduler.submitTasks(taskSet)
+    taskScheduler.submitTasks(rpTaskSet)
+    // should have 2 for default profile and 2 for additional resource profile
+    var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(5 === taskDescriptions.length)
+    var has2Gpus = 0
+    var has1Gpu = 0
+    for (tDesc <- taskDescriptions) {
+      assert(tDesc.resources.contains(GPU))
+      if (tDesc.resources(GPU).addresses.size == 2) {
+        has2Gpus += 1
+      }
+      if (tDesc.resources(GPU).addresses.size == 1) {
+        has1Gpu += 1
+      }
+    }
+    assert(has2Gpus == 3)
+    assert(has1Gpu == 2)
+
+    val resources3 = Map(GPU -> ArrayBuffer("14", "15", "16", "17", "18", 
"19"))
+
+    // clear the first 2 worker offers so they don't have any room and add a 
third
+    // for the resource profile
+    val workerOffers3 = IndexedSeq(
+      new WorkerOffer("executor0", "host0", 0, None, Map.empty),
+      new WorkerOffer("executor1", "host1", 0, None, Map.empty, rp.id),
+      new WorkerOffer("executor2", "host2", 6, None, resources3, rp.id))
+    taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten
+    assert(2 === taskDescriptions.length)
+    assert(taskDescriptions.head.resources.contains(GPU))
+    assert(2 == taskDescriptions.head.resources(GPU).addresses.size)
+  }
+
   /**
    * Used by tests to simulate a task failure. This calls the failure handler 
explicitly, to ensure
    * that all the state is updated when this method returns. Otherwise, 
there's no way to know when
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index b740e35..4566e3c 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -34,6 +34,8 @@ import org.scalatest.concurrent.Eventually
 import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config
+import org.apache.spark.internal.config.Tests.SKIP_VALIDATE_CORES_TESTING
+import org.apache.spark.resource.{ResourceInformation, ResourceProfile}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -213,7 +215,6 @@ class TaskSetManagerSuite
     super.afterEach()
   }
 
-
   test("TaskSet with no preferences") {
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
@@ -657,7 +658,8 @@ class TaskSetManagerSuite
     sc = new SparkContext("local", "test")
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
 
-    val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0, null)
+    val taskSet = new TaskSet(Array(new LargeTask(0)), 0, 0, 0,
+      null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     assert(!manager.emittedTaskSizeWarning)
@@ -672,7 +674,8 @@ class TaskSetManagerSuite
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
 
     val taskSet = new TaskSet(
-      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)), 0, 0, 0, null)
+      Array(new NotSerializableFakeTask(1, 0), new NotSerializableFakeTask(0, 
1)),
+      0, 0, 0, null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     intercept[TaskNotSerializableException] {
@@ -743,7 +746,8 @@ class TaskSetManagerSuite
     val singleTask = new ShuffleMapTask(0, 0, null, new Partition {
         override def index: Int = 0
       }, Seq(TaskLocation("host1", "execA")), new Properties, null)
-    val taskSet = new TaskSet(Array(singleTask), 0, 0, 0, null)
+    val taskSet = new TaskSet(Array(singleTask), 0, 0, 0,
+      null, ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
     // Offer host1, which should be accepted as a PROCESS_LOCAL location
@@ -1053,10 +1057,10 @@ class TaskSetManagerSuite
     }
     // Offer resources for 4 tasks to start
     for ((k, v) <- List(
-        "exec1" -> "host1",
-        "exec1" -> "host1",
-        "exec2" -> "host2",
-        "exec2" -> "host2")) {
+      "exec1" -> "host1",
+      "exec1" -> "host1",
+      "exec2" -> "host2",
+      "exec2" -> "host2")) {
       val taskOption = manager.resourceOffer(k, v, NO_PREF)
       assert(taskOption.isDefined)
       val task = taskOption.get
@@ -1480,10 +1484,10 @@ class TaskSetManagerSuite
     }
     // Offer resources for 4 tasks to start
     for ((exec, host) <- Seq(
-        "exec1" -> "host1",
-        "exec1" -> "host1",
-        "exec3" -> "host3",
-        "exec2" -> "host2")) {
+      "exec1" -> "host1",
+      "exec1" -> "host1",
+      "exec3" -> "host3",
+      "exec2" -> "host2")) {
       val taskOption = manager.resourceOffer(exec, host, NO_PREF)
       assert(taskOption.isDefined)
       val task = taskOption.get
@@ -1552,10 +1556,10 @@ class TaskSetManagerSuite
     }
     // Offer resources for 4 tasks to start
     for ((k, v) <- List(
-        "exec1" -> "host1",
-        "exec1" -> "host1",
-        "exec2" -> "host2",
-        "exec2" -> "host2")) {
+      "exec1" -> "host1",
+      "exec1" -> "host1",
+      "exec2" -> "host2",
+      "exec2" -> "host2")) {
       val taskOption = manager.resourceOffer(k, v, NO_PREF)
       assert(taskOption.isDefined)
       val task = taskOption.get
@@ -1655,7 +1659,7 @@ class TaskSetManagerSuite
     assert(FakeRackUtil.numBatchInvocation === 1)
   }
 
-  test("TaskSetManager allocate resource addresses from available resources") {
+  test("TaskSetManager passes task resource along") {
     import TestUtils._
 
     sc = new SparkContext("local", "test")
@@ -1664,15 +1668,13 @@ class TaskSetManagerSuite
     val taskSet = FakeTask.createTaskSet(1)
     val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
 
-    val availableResources = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
-    val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF, 
availableResources)
+    val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, 
Array("0", "1")))
+    val taskOption =
+      manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments)
     assert(taskOption.isDefined)
     val allocatedResources = taskOption.get.resources
     assert(allocatedResources.size == 1)
     assert(allocatedResources(GPU).addresses sameElements Array("0", "1"))
-    // Allocated resource addresses should still present in 
`availableResources`, they will only
-    // get removed inside TaskSchedulerImpl later.
-    assert(availableResources(GPU) sameElements Array("0", "1", "2", "3"))
   }
 
   test("SPARK-26755 Ensure that a speculative task is submitted only once for 
execution") {
@@ -1793,15 +1795,16 @@ class TaskSetManagerSuite
       numTasks: Int,
       numExecutorCores: Int,
       numCoresPerTask: Int): (TaskSetManager, ManualClock) = {
-    sc = new SparkContext("local", "test")
-    sc.conf.set(config.SPECULATION_ENABLED, true)
-    sc.conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
+    val conf = new SparkConf()
+    conf.set(config.SPECULATION_ENABLED, true)
+    conf.set(config.SPECULATION_QUANTILE.key, speculationQuantile.toString)
     // Set the number of slots per executor
-    sc.conf.set(config.EXECUTOR_CORES.key, numExecutorCores.toString)
-    sc.conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString)
+    conf.set(config.EXECUTOR_CORES.key, numExecutorCores.toString)
+    conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString)
     if (speculationThresholdOpt.isDefined) {
-      sc.conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
speculationThresholdOpt.get)
+      conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, 
speculationThresholdOpt.get)
     }
+    sc = new SparkContext("local", "test", conf)
     sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
     // Create a task set with the given number of tasks
     val taskSet = FakeTask.createTaskSet(numTasks)
@@ -1890,15 +1893,28 @@ class TaskSetManagerSuite
 
   test("SPARK-30417 when spark.task.cpus is greater than spark.executor.cores 
due to " +
     "standalone settings, speculate if there is only one task in the stage") {
-    val (manager, clock) = testSpeculationDurationSetup(
-      Some("60min"),
-      // Set the quantile to be 1.0 so that regular speculation would not be 
triggered
-      speculationQuantile = 1.0,
-      numTasks = 1,
-      numExecutorCores = 1,
-      numCoresPerTask = 2
-    )
+    val numTasks = 1
+    val numCoresPerTask = 2
+    val conf = new SparkConf()
+    // skip throwing exception when cores per task > cores per executor to 
emulate standalone mode
+    conf.set(SKIP_VALIDATE_CORES_TESTING, true)
+    conf.set(config.SPECULATION_ENABLED, true)
+    conf.set(config.SPECULATION_QUANTILE.key, "1.0")
+    // Skip setting cores per executor to emulate standalone default mode
+    conf.set(config.CPUS_PER_TASK.key, numCoresPerTask.toString)
+    conf.set(config.SPECULATION_TASK_DURATION_THRESHOLD.key, "60min")
+    sc = new SparkContext("local", "test", conf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // Create a task set with the given number of tasks
+    val taskSet = FakeTask.createTaskSet(numTasks)
+    val clock = new ManualClock()
+    val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock 
= clock)
+    manager.isZombie = false
 
+    // Offer resources for the task to start
+    for (i <- 1 to numTasks) {
+      manager.resourceOffer(s"exec$i", s"host$i", NO_PREF)
+    }
     clock.advance(1000*60*60)
     assert(!manager.checkSpeculatableTasks(0))
     assert(sched.speculativeTasks.size == 0)
@@ -1942,7 +1958,8 @@ class TaskSetManagerSuite
     TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
 
     val tasks = Array.tabulate[Task[_]](2)(partition => new 
FakeLongTasks(stageId = 0, partition))
-    val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0, 
priority = 0, null)
+    val taskSet: TaskSet = new TaskSet(tasks, stageId = 0, stageAttemptId = 0, 
priority = 0, null,
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val stageId = taskSet.stageId
     val stageAttemptId = taskSet.stageAttemptId
     sched.submitTasks(taskSet)
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index e2a9914..f1e3fca 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -31,6 +31,7 @@ import org.apache.spark.{SparkContext, SparkException, 
TaskState}
 import org.apache.spark.deploy.mesos.{config => mesosConfig}
 import org.apache.spark.executor.MesosExecutorBackend
 import org.apache.spark.internal.config
+import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.util.Utils
@@ -457,7 +458,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
       super.applicationId
     }
 
-  override def maxNumConcurrentTasks(): Int = {
+  override def maxNumConcurrentTasks(rp: ResourceProfile): Int = {
     // TODO SPARK-25074 support this method for 
MesosFineGrainedSchedulerBackend
     0
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to