This is an automated email from the ASF dual-hosted git repository.

wuyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ab49dc21e43 [SPARK-39853][CORE] Support stage level task resource 
profile for standalone cluster when dynamic allocation disabled
ab49dc21e43 is described below

commit ab49dc21e43822abef5067f959e474c4c8dcfdff
Author: Tengfei Huang <tengfe...@gmail.com>
AuthorDate: Fri Sep 30 09:04:39 2022 +0800

    [SPARK-39853][CORE] Support stage level task resource profile for 
standalone cluster when dynamic allocation disabled
    
    ### What changes were proposed in this pull request?
    Currently stage level scheduling works for yarn/k8s/standalone cluster when 
dynamic allocation is enabled, and spark app will acquire executors with 
different resource profiles and assign tasks to executors with the same 
resource profile id.
    
    This PR proposed to add stage level scheduling when dynamic allocation is 
off. In this case, spark app will only have executors with default resource 
profiles, but different `Stages` can still customize their task resource 
requests which should be compatible with default resource profile executor 
resources. And all these `Stages` with different task resource requests will 
reuse/share the same set of executors with default resource profile.
    
    And this PR proposed to:
    1. Introduces a new special `ResourceProfile`: `TaskResourceProfile`, it 
can be used to describe different task resource requests when dynamic 
allocation is off. And tasks bind to this `TaskResourceProfile` will reuse 
executors with default resource profile.
    `Exception` should be thrown if executors with default resource profile can 
not fulfill the task resource requests.
    ```
    class TaskResourceProfile(override val taskResources: Map[String, 
TaskResourceRequest])
      extends ResourceProfile(
        
ResourceProfile.getOrCreateDefaultProfile(SparkEnv.get.conf).executorResources,
        taskResources)
    ```
    2. `DADScheduler` and `TaskScheduler` will schedule tasks with customized 
`ResourceProfile` based on resource profile type and resource profile Id, 
taskSets with `TaskResourceProfile` can be scheduled to executors with 
`DEFAULT_RESOURCE_PROFILE_ID` and other taskSets can be scheduled to executors 
with exactly same resource profile id.
    
    ### Why are the changes needed?
    When dynamic allocation is disabled, we can also leverage stage level 
schedule to customize task resource requests for different stages.
    
    ### Does this PR introduce _any_ user-facing change?
    Spark users can specify `TaskResourceProfile` to customize task resource 
requests for different stages when dynamic allocation is off.
    
    ### How was this patch tested?
    New UTs added.
    
    Closes #37268 from ivoson/stage-schedule-dynamic-off.
    
    Lead-authored-by: Tengfei Huang <tengfe...@gmail.com>
    Co-authored-by: Huang Tengfei <tengfe...@gmail.com>
    Signed-off-by: Yi Wu <yi...@databricks.com>
---
 .../spark/deploy/ApplicationDescription.scala      |  3 +-
 .../spark/deploy/master/ApplicationInfo.scala      |  4 +-
 .../apache/spark/resource/ResourceProfile.scala    | 63 ++++++++++----
 .../spark/resource/ResourceProfileBuilder.scala    |  6 +-
 .../spark/resource/ResourceProfileManager.scala    | 70 +++++++++++-----
 .../org/apache/spark/resource/ResourceUtils.scala  |  6 +-
 .../org/apache/spark/scheduler/DAGScheduler.scala  |  9 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala | 19 +++--
 .../resource/ResourceProfileManagerSuite.scala     | 37 +++++++++
 .../spark/resource/ResourceProfileSuite.scala      | 65 ++++++++++++++-
 .../apache/spark/scheduler/DAGSchedulerSuite.scala | 19 ++++-
 .../spark/scheduler/TaskSchedulerImplSuite.scala   | 97 +++++++++++++++++++++-
 docs/configuration.md                              |  4 +-
 docs/spark-standalone.md                           |  4 +-
 14 files changed, 344 insertions(+), 62 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala 
b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 39c2af01846..67d0d851b60 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -20,7 +20,6 @@ package org.apache.spark.deploy
 import java.net.URI
 
 import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, 
ResourceUtils}
-import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources
 
 private[spark] case class ApplicationDescription(
     name: String,
@@ -40,7 +39,7 @@ private[spark] case class ApplicationDescription(
   def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
   def resourceReqsPerExecutor: Seq[ResourceRequirement] =
     ResourceUtils.executorResourceRequestToRequirement(
-      
getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))
+      
defaultProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName))
 
   override def toString: String = "ApplicationDescription(" + name + ")"
 }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index a2926ca64bc..e66933b84af 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.deploy.ApplicationDescription
 import org.apache.spark.resource.{ResourceInformation, ResourceProfile, 
ResourceUtils}
-import org.apache.spark.resource.ResourceProfile.{getCustomExecutorResources, 
DEFAULT_RESOURCE_PROFILE_ID}
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.rpc.RpcEndpointRef
 import org.apache.spark.util.Utils
 
@@ -101,7 +101,7 @@ private[spark] class ApplicationInfo(
         .map(_.toInt)
         .getOrElse(defaultMemoryMbPerExecutor)
       val customResources = ResourceUtils.executorResourceRequestToRequirement(
-        
getCustomExecutorResources(resourceProfile).values.toSeq.sortBy(_.resourceName))
+        
resourceProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName))
 
       rpIdToResourceDesc(resourceProfile.id) =
         ExecutorResourceDescription(coresPerExecutor, memoryMbPerExecutor, 
customResources)
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 5e02c61459d..afd612433a7 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.{SparkConf, SparkContext, SparkException}
+import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException}
 import org.apache.spark.annotation.{Evolving, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
@@ -94,6 +94,15 @@ class ResourceProfile(
     executorResources.get(ResourceProfile.MEMORY).map(_.amount)
   }
 
+  private[spark] def getCustomTaskResources(): Map[String, 
TaskResourceRequest] = {
+    taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
+  }
+
+  protected[spark] def getCustomExecutorResources(): Map[String, 
ExecutorResourceRequest] = {
+    executorResources.
+      filterKeys(k => 
!ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
+  }
+
   /*
    * This function takes into account fractional amounts for the task resource 
requirement.
    * Spark only supports fractional amounts < 1 to basically allow for 
multiple tasks
@@ -182,8 +191,8 @@ class ResourceProfile(
     val numPartsPerResourceMap = new mutable.HashMap[String, Int]
     numPartsPerResourceMap(ResourceProfile.CORES) = 1
     val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest]
-    taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this)
-    val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this)
+    taskResourcesToCheck ++= this.getCustomTaskResources()
+    val execResourceToCheck = this.getCustomExecutorResources()
     execResourceToCheck.foreach { case (rName, execReq) =>
       val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0)
       numPartsPerResourceMap(rName) = 1
@@ -242,7 +251,8 @@ class ResourceProfile(
 
   // check that the task resources and executor resources are equal, but id's 
could be different
   private[spark] def resourcesEqual(rp: ResourceProfile): Boolean = {
-    rp.taskResources == taskResources && rp.executorResources == 
executorResources
+    rp.taskResources == taskResources && rp.executorResources == 
executorResources &&
+      rp.getClass == this.getClass
   }
 
   override def hashCode(): Int = Seq(taskResources, 
executorResources).hashCode()
@@ -253,6 +263,40 @@ class ResourceProfile(
   }
 }
 
+/**
+ * Resource profile which only contains task resources, can be used for stage 
level task schedule
+ * when dynamic allocation is disabled, tasks will be scheduled to executors 
with default resource
+ * profile based on task resources described by this task resource profile.
+ * And when dynamic allocation is enabled, will require new executors for this 
profile based on
+ * the default executor resources requested at startup and assign tasks only 
on executors created
+ * with this resource profile.
+ *
+ * @param taskResources Resource requests for tasks. Mapped from the resource
+ *                      name (e.g., cores, memory, CPU) to its specific 
request.
+ */
+@Evolving
+@Since("3.4.0")
+private[spark] class TaskResourceProfile(
+    override val taskResources: Map[String, TaskResourceRequest])
+  extends ResourceProfile(Map.empty, taskResources) {
+
+  override protected[spark] def getCustomExecutorResources()
+      : Map[String, ExecutorResourceRequest] = {
+    if (SparkEnv.get == null) {
+      // This will be called in standalone master when dynamic allocation 
enabled.
+      return super.getCustomExecutorResources()
+    }
+
+    val sparkConf = SparkEnv.get.conf
+    if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
+      ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+        .getCustomExecutorResources()
+    } else {
+      super.getCustomExecutorResources()
+    }
+  }
+}
+
 object ResourceProfile extends Logging {
   // task resources
   /**
@@ -393,17 +437,6 @@ object ResourceProfile extends Logging {
     }
   }
 
-  private[spark] def getCustomTaskResources(
-      rp: ResourceProfile): Map[String, TaskResourceRequest] = {
-    rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
-  }
-
-  private[spark] def getCustomExecutorResources(
-      rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
-    rp.executorResources.
-      filterKeys(k => 
!ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
-  }
-
   /*
    * Get the number of cpus per task if its set in the profile, otherwise 
return the
    * cpus per task for the default profile.
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
index f6b30d32737..584ff32b447 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala
@@ -93,7 +93,11 @@ class ResourceProfileBuilder() {
   }
 
   def build(): ResourceProfile = {
-    new ResourceProfile(executorResources, taskResources)
+    if (_executorResources.isEmpty) {
+      new TaskResourceProfile(taskResources)
+    } else {
+      new ResourceProfile(executorResources, taskResources)
+    }
   }
 }
 
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 489d9c3e858..3f48aaded5c 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -59,35 +59,67 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
   private val testExceptionThrown = 
sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)
 
   /**
-   * If we use anything except the default profile, it's only supported on 
YARN and Kubernetes
-   * with dynamic allocation enabled. Throw an exception if not supported.
+   * If we use anything except the default profile, it's supported on YARN, 
Kubernetes and
+   * Standalone with dynamic allocation enabled, and task resource profile 
with dynamic allocation
+   * disabled on Standalone. Throw an exception if not supported.
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
-    val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-    val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
-      isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
-    val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
-      isNotDefaultProfile && (isYarn || isK8s || isStandalone) && 
!dynamicEnabled
-    // We want the exception to be thrown only when we are specifically 
testing for the
-    // exception or in a real application. Otherwise in all other testing 
scenarios we want
-    // to skip throwing the exception so that we can test in other modes to 
make testing easier.
-    if ((notRunningUnitTests || testExceptionThrown) &&
+    if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
+      if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
+        throw new SparkException("TaskResourceProfiles are only supported for 
Standalone " +
+          "cluster for now when dynamic allocation is disabled.")
+      }
+    } else {
+      val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+      val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
+        isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
+      val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
+        isNotDefaultProfile && (isYarn || isK8s || isStandalone) && 
!dynamicEnabled
+
+      // We want the exception to be thrown only when we are specifically 
testing for the
+      // exception or in a real application. Otherwise in all other testing 
scenarios we want
+      // to skip throwing the exception so that we can test in other modes to 
make testing easier.
+      if ((notRunningUnitTests || testExceptionThrown) &&
         (notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
           YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
-      throw new SparkException("ResourceProfiles are only supported on YARN 
and Kubernetes " +
-        "and Standalone with dynamic allocation enabled.")
-    }
+        throw new SparkException("ResourceProfiles are only supported on YARN 
and Kubernetes " +
+          "and Standalone with dynamic allocation enabled.")
+      }
 
-    if (isStandalone && rp.getExecutorCores.isEmpty &&
-      sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
-      logWarning("Neither executor cores is set for resource profile, nor 
spark.executor.cores " +
-        "is explicitly set, you may get more executors allocated than 
expected. It's recommended " +
-        "to set executor cores explicitly. Please check SPARK-30299 for more 
details.")
+      if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
+        sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
+        logWarning("Neither executor cores is set for resource profile, nor 
spark.executor.cores " +
+          "is explicitly set, you may get more executors allocated than 
expected. " +
+          "It's recommended to set executor cores explicitly. " +
+          "Please check SPARK-30299 for more details.")
+      }
     }
 
     true
   }
 
+  /**
+   * Check whether a task with specific taskRpId can be scheduled to executors
+   * with executorRpId.
+   *
+   * Here are the rules:
+   * 1. When dynamic allocation is disabled, only [[TaskResourceProfile]] is 
supported,
+   *    and tasks with [[TaskResourceProfile]] can be scheduled to executors 
with default
+   *    resource profile.
+   * 2. For other scenarios(when dynamic allocation is enabled), tasks can be 
scheduled to
+   *    executors where resource profile exactly matches.
+   */
+  private[spark] def canBeScheduled(taskRpId: Int, executorRpId: Int): Boolean 
= {
+    assert(resourceProfileIdToResourceProfile.contains(taskRpId) &&
+      resourceProfileIdToResourceProfile.contains(executorRpId),
+      "Tasks and executors must have valid resource profile id")
+    val taskRp = resourceProfileFromId(taskRpId)
+
+    // When dynamic allocation disabled, tasks with TaskResourceProfile can 
always reuse
+    // all the executors with default resource profile.
+    taskRpId == executorRpId || (!dynamicEnabled && 
taskRp.isInstanceOf[TaskResourceProfile])
+  }
+
   def addResourceProfile(rp: ResourceProfile): Unit = {
     isSupported(rp)
     var putNewProfile = false
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
index 58b37269be4..0e18ecf0e51 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
@@ -356,7 +356,7 @@ private[spark] object ResourceUtils extends Logging {
     val fileAllocated = parseAllocated(resourcesFileOpt, componentName)
     val fileAllocResMap = fileAllocated.map(a => (a.id.resourceName, 
a.toResourceInformation)).toMap
     // only want to look at the ResourceProfile for resources not in the 
resources file
-    val execReq = ResourceProfile.getCustomExecutorResources(resourceProfile)
+    val execReq = resourceProfile.getCustomExecutorResources()
     val filteredExecreq = execReq.filterNot { case (rname, _) => 
fileAllocResMap.contains(rname) }
     val rpAllocations = filteredExecreq.map { case (rName, execRequest) =>
       val resourceId = new ResourceID(componentName, rName)
@@ -444,8 +444,8 @@ private[spark] object ResourceUtils extends Logging {
         maxTaskPerExec = numTasksPerExecCores
       }
     }
-    val taskReq = ResourceProfile.getCustomTaskResources(rp)
-    val execReq = ResourceProfile.getCustomExecutorResources(rp)
+    val taskReq = rp.getCustomTaskResources()
+    val execReq = rp.getCustomExecutorResources()
 
     if (limitingResource.nonEmpty && 
!limitingResource.equals(ResourceProfile.CPUS)) {
       if ((taskCpus * maxTaskPerExec) < cores) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 475afd01d00..86786e64ced 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -44,7 +44,7 @@ import org.apache.spark.network.shuffle.protocol.MergeStatuses
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.partial.{ApproximateActionListener, 
ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd.{RDD, RDDCheckpointData}
-import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.{ResourceProfile, TaskResourceProfile}
 import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, 
EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
 import org.apache.spark.rpc.RpcTimeout
 import org.apache.spark.storage._
@@ -592,7 +592,12 @@ private[spark] class DAGScheduler(
         if (x.amount > v.amount) x else v).getOrElse(v)
       k -> larger
     }
-    new ResourceProfile(mergedExecReq, mergedTaskReq)
+
+    if (mergedExecReq.isEmpty) {
+      new TaskResourceProfile(mergedTaskReq)
+    } else {
+      new ResourceProfile(mergedExecReq, mergedTaskReq)
+    }
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d3e27a94e29..a6735f380f1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -388,9 +388,10 @@ private[spark] class TaskSchedulerImpl(
       val execId = shuffledOffers(i).executorId
       val host = shuffledOffers(i).host
       val taskSetRpID = taskSet.taskSet.resourceProfileId
-      // make the resource profile id a hard requirement for now - ie only put 
tasksets
-      // on executors where resource profile exactly matches.
-      if (taskSetRpID == shuffledOffers(i).resourceProfileId) {
+
+      // check whether the task can be scheduled to the executor base on 
resource profile.
+      if (sc.resourceProfileManager
+        .canBeScheduled(taskSetRpID, shuffledOffers(i).resourceProfileId)) {
         val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, 
availableCpus(i),
           availableResources(i))
         taskResAssignmentsOpt.foreach { taskResAssignments =>
@@ -463,7 +464,7 @@ private[spark] class TaskSchedulerImpl(
     // check if the ResourceProfile has cpus first since that is common case
     if (availCpus < taskCpus) return None
     // only look at the resource other then cpus
-    val tsResources = ResourceProfile.getCustomTaskResources(taskSetProf)
+    val tsResources = taskSetProf.getCustomTaskResources()
     if (tsResources.isEmpty) return Some(Map.empty)
     val localTaskReqAssign = HashMap[String, ResourceInformation]()
     // we go through all resources here so that we can make sure they match 
and also get what the
@@ -1222,13 +1223,13 @@ private[spark] object TaskSchedulerImpl {
 
   /**
    * Calculate the max available task slots given the `availableCpus` and 
`availableResources`
-   * from a collection of ResourceProfiles. And only those ResourceProfiles 
who has the
-   * same id with the `rpId` can be used to calculate the task slots.
+   * from a collection of ResourceProfiles. And only those ResourceProfiles 
who can be assigned
+   * tasks with the `rpId` can be used to calculate the task slots.
    *
    * @param scheduler the TaskSchedulerImpl instance
    * @param conf SparkConf used to calculate the limiting resource and get the 
cpu amount per task
-   * @param rpId the target ResourceProfile id. Only those ResourceProfiles 
who has the same id
-   *             with it can be used to calculate the task slots.
+   * @param rpId the ResourceProfile id for the task set. Only those 
ResourceProfiles who can be
+   *             assigned with the tasks can be used to calculate the task 
slots.
    * @param availableRPIds an Array of ids of the available ResourceProfiles 
from the executors.
    * @param availableCpus an Array of the amount of available cpus from the 
executors.
    * @param availableResources an Array of the resources map from the 
executors. In the resource
@@ -1257,7 +1258,7 @@ private[spark] object TaskSchedulerImpl {
     val taskLimit = 
resourceProfile.taskResources.get(limitingResource).map(_.amount).get
 
     availableCpus.zip(availableResources).zip(availableRPIds)
-      .filter { case (_, id) => id == rpId }
+      .filter { case (_, id) => 
scheduler.sc.resourceProfileManager.canBeScheduled(rpId, id) }
       .map { case ((cpu, resources), _) =>
         val numTasksPerExecCores = cpu / cpusPerTask
         if (limitedByCpu) {
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index aa008135609..e97d5c7883a 100644
--- 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -116,6 +116,43 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     assert(rpmanager.isSupported(immrprof))
   }
 
+  test("isSupported task resource profiles with dynamic allocation disabled") {
+    val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4)
+    conf.set(DYN_ALLOCATION_ENABLED, false)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+
+    var rpmanager = new ResourceProfileManager(conf, listenerBus)
+    // default profile should always work
+    val defaultProf = rpmanager.defaultResourceProfile
+    assert(rpmanager.isSupported(defaultProf))
+
+    // task resource profile.
+    val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
+    val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
+    assert(rpmanager.isSupported(taskProf))
+
+    conf.setMaster("local")
+    rpmanager = new ResourceProfileManager(conf, listenerBus)
+    val error = intercept[SparkException] {
+      rpmanager.isSupported(taskProf)
+    }.getMessage
+    assert(error === "TaskResourceProfiles are only supported for Standalone " 
+
+      "cluster for now when dynamic allocation is disabled.")
+  }
+
+  test("isSupported task resource profiles with dynamic allocation enabled") {
+    val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4)
+    conf.set(DYN_ALLOCATION_ENABLED, true)
+    conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
+
+    val rpmanager = new ResourceProfileManager(conf, listenerBus)
+
+    // task resource profile.
+    val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
+    val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
+    assert(rpmanager.isSupported(taskProf))
+  }
+
   test("isSupported with local mode") {
     val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
     conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
index 6c36f5c8555..d07b85847e7 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala
@@ -17,12 +17,15 @@
 
 package org.apache.spark.resource
 
-import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.mockito.Mockito.when
+import org.scalatestplus.mockito.MockitoSugar
+
+import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY
 import org.apache.spark.resource.TestResourceIDs._
 
-class ResourceProfileSuite extends SparkFunSuite {
+class ResourceProfileSuite extends SparkFunSuite with MockitoSugar {
 
   override def beforeAll(): Unit = {
     try {
@@ -190,6 +193,33 @@ class ResourceProfileSuite extends SparkFunSuite {
     assert(immrprof.isCoresLimitKnown == true)
   }
 
+  test("tasks and limit resource for task resource profile") {
+    val sparkConf = new SparkConf().setMaster("spark://testing")
+      .set(EXECUTOR_CORES, 2)
+      .set("spark.dynamicAllocation.enabled", "false")
+      .set("spark.executor.resource.gpu.amount", "2")
+      .set("spark.executor.resource.gpu.discoveryScript", "myscript")
+
+    withMockSparkEnv(sparkConf) {
+      val rpBuilder1 = new ResourceProfileBuilder()
+      val rp1 = rpBuilder1
+        .require(new TaskResourceRequests().resource("gpu", 1))
+        .build()
+      assert(rp1.isInstanceOf[TaskResourceProfile])
+      assert(rp1.limitingResource(sparkConf) == ResourceProfile.CPUS)
+      assert(rp1.maxTasksPerExecutor(sparkConf) == 2)
+      assert(rp1.isCoresLimitKnown)
+
+      val rpBuilder2 = new ResourceProfileBuilder()
+      val rp2 = rpBuilder2
+        .require(new TaskResourceRequests().resource("gpu", 2))
+        .build()
+      assert(rp1.isInstanceOf[TaskResourceProfile])
+      assert(rp2.limitingResource(sparkConf) == "gpu")
+      assert(rp2.maxTasksPerExecutor(sparkConf) == 1)
+      assert(rp2.isCoresLimitKnown)
+    }
+  }
 
   test("Create ResourceProfile") {
     val rprof = new ResourceProfileBuilder()
@@ -257,6 +287,22 @@ class ResourceProfileSuite extends SparkFunSuite {
     assert(rprof.resourcesEqual(rprof2), "resource profile resourcesEqual not 
working")
   }
 
+  test("test TaskResourceProfiles equal") {
+    val rprofBuilder = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    rprofBuilder.require(taskReq)
+    val rprof = rprofBuilder.build()
+
+    val taskReq1 = new TaskResourceRequests().resource("gpu", 1)
+    val rprof1 = new ResourceProfile(Map.empty, taskReq1.requests)
+    assert(!rprof.resourcesEqual(rprof1),
+      "resource profiles having different types should not equal")
+
+    val taskReq2 = new TaskResourceRequests().resource("gpu", 1)
+    val rprof2 = new TaskResourceProfile(taskReq2.requests)
+    assert(rprof.resourcesEqual(rprof2), "task resource profile resourcesEqual 
not working")
+  }
+
   test("Test ExecutorResourceRequests memory helpers") {
     val rprof = new ResourceProfileBuilder()
     val ereqs = new ExecutorResourceRequests()
@@ -314,7 +360,7 @@ class ResourceProfileSuite extends SparkFunSuite {
     // Update this if new resource type added
     assert(ResourceProfile.allSupportedExecutorResources.size === 5,
       "Executor resources should have 5 supported resources")
-    assert(ResourceProfile.getCustomExecutorResources(rprof.build).size === 1,
+    assert(rprof.build().getCustomExecutorResources().size === 1,
       "Executor resources should have 1 custom resource")
   }
 
@@ -327,7 +373,18 @@ class ResourceProfileSuite extends SparkFunSuite {
       .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072")
     rprof.require(taskReq).require(eReq)
 
-    assert(ResourceProfile.getCustomTaskResources(rprof.build).size === 1,
+    assert(rprof.build().getCustomTaskResources().size === 1,
       "Task resources should have 1 custom resource")
   }
+
+  private def withMockSparkEnv(conf: SparkConf)(f: => Unit): Unit = {
+    val previousEnv = SparkEnv.get
+    val mockEnv = mock[SparkEnv]
+    when(mockEnv.conf).thenReturn(conf)
+    SparkEnv.set(mockEnv)
+
+    try f finally {
+      SparkEnv.set(previousEnv)
+    }
+  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 847e0622213..19a9af86afb 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests
 import org.apache.spark.network.shuffle.ExternalBlockStoreClient
 import org.apache.spark.rdd.{DeterministicLevel, RDD}
-import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
ResourceProfileBuilder, TaskResourceRequests}
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
ResourceProfileBuilder, TaskResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils.{FPGA, GPU}
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -3424,6 +3424,23 @@ class DAGSchedulerSuite extends SparkFunSuite with 
TempLocalSparkContext with Ti
     assert(mergedRp.getExecutorCores.get == 4)
   }
 
+  test("test merge task resource profiles") {
+    conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true")
+    // Ensure the initialization of SparkEnv
+    sc
+
+    val treqs1 = new TaskResourceRequests().cpus(1)
+    val rp1 = new TaskResourceProfile(treqs1.requests)
+    val treqs2 = new TaskResourceRequests().cpus(1)
+    val rp2 = new TaskResourceProfile(treqs2.requests)
+    val treqs3 = new TaskResourceRequests().cpus(2)
+    val rp3 = new TaskResourceProfile(treqs3.requests)
+    val mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, 
rp3))
+
+    assert(mergedRp.isInstanceOf[TaskResourceProfile])
+    assert(mergedRp.getTaskCpus.get == 2)
+  }
+
   /**
    * Checks the DAGScheduler's internal logic for traversing an RDD DAG by 
making sure that
    * getShuffleDependenciesAndResourceProfiles correctly returns the direct 
shuffle dependencies
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 869a7232437..4e9e9755e85 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -33,7 +33,7 @@ import org.scalatestplus.mockito.MockitoSugar
 
 import org.apache.spark._
 import org.apache.spark.internal.config
-import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
TaskResourceRequests}
+import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, 
TaskResourceProfile, TaskResourceRequests}
 import org.apache.spark.resource.ResourceUtils._
 import org.apache.spark.resource.TestResourceIDs._
 import org.apache.spark.util.{Clock, ManualClock, ThreadUtils}
@@ -1833,6 +1833,101 @@ class TaskSchedulerImplSuite extends SparkFunSuite with 
LocalSparkContext
     assert(2 == taskDescriptions.head.resources(GPU).addresses.size)
   }
 
+  test("Scheduler works with task resource profiles") {
+    val taskCpus = 1
+    val taskGpus = 1
+    val executorGpus = 4
+    val executorCpus = 4
+
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+
+    val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+    val taskSet = FakeTask.createTaskSet(3)
+    val rpTaskSet = FakeTask.createTaskSet(5, stageId = 1, stageAttemptId = 0,
+      priority = 0, rpId = rp.id)
+
+    val resources0 = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
+    val resources1 = Map(GPU -> ArrayBuffer("4", "5", "6", "7"))
+
+    val workerOffers =
+      IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0),
+        WorkerOffer("executor1", "host1", 4, None, resources1))
+
+    taskScheduler.submitTasks(taskSet)
+    taskScheduler.submitTasks(rpTaskSet)
+    // should have 3 for default profile and 2 for additional resource profile
+    var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
+    assert(5 === taskDescriptions.length)
+    var has2Gpus = 0
+    var has1Gpu = 0
+    for (tDesc <- taskDescriptions) {
+      assert(tDesc.resources.contains(GPU))
+      if (tDesc.resources(GPU).addresses.size == 2) {
+        has2Gpus += 1
+      }
+      if (tDesc.resources(GPU).addresses.size == 1) {
+        has1Gpu += 1
+      }
+    }
+    assert(has2Gpus == 2)
+    assert(has1Gpu == 3)
+
+    val resources3 = Map(GPU -> ArrayBuffer("8", "9", "10", "11"))
+
+    // clear the first 2 worker offers so they don't have any room and add a 
third
+    // for the resource profile
+    val workerOffers3 = IndexedSeq(
+      WorkerOffer("executor0", "host0", 0, None, Map.empty),
+      WorkerOffer("executor1", "host1", 0, None, Map.empty),
+      WorkerOffer("executor2", "host2", 4, None, resources3))
+    taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten
+    assert(2 === taskDescriptions.length)
+    assert(taskDescriptions.head.resources.contains(GPU))
+    assert(2 == taskDescriptions.head.resources(GPU).addresses.size)
+  }
+
+  test("Calculate available tasks slots for task resource profiles") {
+    val taskCpus = 1
+    val taskGpus = 1
+    val executorGpus = 4
+    val executorCpus = 4
+
+    val taskScheduler = setupScheduler(numCores = executorCpus,
+      config.CPUS_PER_TASK.key -> taskCpus.toString,
+      TASK_GPU_ID.amountConf -> taskGpus.toString,
+      EXECUTOR_GPU_ID.amountConf -> executorGpus.toString,
+      config.EXECUTOR_CORES.key -> executorCpus.toString
+    )
+
+    val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2)
+    val rp = new TaskResourceProfile(treqs.requests)
+    taskScheduler.sc.resourceProfileManager.addResourceProfile(rp)
+
+    val resources0 = Map(GPU -> ArrayBuffer("0", "1", "2", "3"))
+    val resources1 = Map(GPU -> ArrayBuffer("4", "5", "6", "7"))
+
+    val workerOffers =
+      IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0),
+        WorkerOffer("executor1", "host1", 4, None, resources1))
+    val availableResourcesAmount = workerOffers.map(_.resources).map { 
resourceMap =>
+        // available addresses already takes into account if there are 
fractional
+        // task resource requests
+        resourceMap.map { case (name, addresses) => (name, addresses.length) }
+      }
+
+    val taskSlotsForRp = TaskSchedulerImpl.calculateAvailableSlots(
+      taskScheduler, taskScheduler.conf, rp.id, 
workerOffers.map(_.resourceProfileId).toArray,
+      workerOffers.map(_.cores).toArray, availableResourcesAmount.toArray)
+    assert(taskSlotsForRp === 4)
+  }
+
   private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int): 
TaskSchedulerImpl = {
     // one task per host
     val numHosts = numTasks
diff --git a/docs/configuration.md b/docs/configuration.md
index 55e595ad301..ffd36209e2d 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3243,9 +3243,9 @@ See your cluster manager specific page for requirements 
and details on each of -
 # Stage Level Scheduling Overview
 
 The stage level scheduling feature allows users to specify task and executor 
resource requirements at the stage level. This allows for different stages to 
run with executors that have different resources. A prime example of this is 
one ETL stage runs with executors with just CPUs, the next stage is an ML stage 
that needs GPUs. Stage level scheduling allows for user to request different 
executors that have GPUs when the ML stage runs rather then having to acquire 
executors with GPUs at th [...]
-This is only available for the RDD API in Scala, Java, and Python.  It is 
available on YARN, Kubernetes and Standalone when dynamic allocation is 
enabled. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) 
page or 
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page 
or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for 
more implementation details.
+This is only available for the RDD API in Scala, Java, and Python.  It is 
available on YARN, Kubernetes and Standalone when dynamic allocation is 
enabled. When dynamic allocation is disabled, it allows users to specify 
different task resource requirements at stage level, and this is supported on 
Standalone cluster right now. See the 
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or 
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page 
or [Standa [...]
 
-See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this 
feature. The current implementation acquires new executors for each 
`ResourceProfile`  created and currently has to be an exact match. Spark does 
not try to fit tasks into an executor that require a different ResourceProfile 
than the executor was created with. Executors that are not in use will idle 
timeout with the dynamic allocation logic. The default configuration for this 
feature is to only allow one Resour [...]
+See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this 
feature. When dynamic allocation is disabled, tasks with different task 
resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. 
While when dynamic allocation is enabled, the current implementation acquires 
new executors for each `ResourceProfile`  created and currently has to be an 
exact match. Spark does not try to fit tasks into an executor that require a 
different ResourceProfile than the [...]
 
 # Push-based shuffle overview
 
diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md
index 559e3bca6c9..b431752f166 100644
--- a/docs/spark-standalone.md
+++ b/docs/spark-standalone.md
@@ -467,7 +467,9 @@ worker during one single schedule iteration.
 
 # Stage Level Scheduling Overview
 
-Stage level scheduling is supported on Standalone when dynamic allocation is 
enabled. Currently, when the Master allocates executors for one application, it 
will schedule based on the order of the ResourceProfile ids for multiple 
ResourceProfiles. The ResourceProfile with smaller id will be scheduled 
firstly. Normally this won’t matter as Spark finishes one stage before starting 
another one, the only case this might have an affect is in a job server type 
scenario, so its something to kee [...]
+Stage level scheduling is supported on Standalone:
+- When dynamic allocation is disabled: It allows users to specify different 
task resource requirements at the stage level and will use the same executors 
requested at startup.
+- When dynamic allocation is enabled: Currently, when the Master allocates 
executors for one application, it will schedule based on the order of the 
ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with 
smaller id will be scheduled firstly. Normally this won’t matter as Spark 
finishes one stage before starting another one, the only case this might have 
an affect is in a job server type scenario, so its something to keep in mind. 
For scheduling, we will only take exe [...]
 
 ## Caveats
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to