This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b80639e643 [SPARK-45250][CORE] Support stage level task resource 
profile for yarn cluster when dynamic allocation disabled
5b80639e643 is described below

commit 5b80639e643b6dd09dd64c3f43ec039b2ef2f9fd
Author: Bobby Wang <wbo4...@gmail.com>
AuthorDate: Mon Oct 2 23:00:56 2023 -0500

    [SPARK-45250][CORE] Support stage level task resource profile for yarn 
cluster when dynamic allocation disabled
    
    ### What changes were proposed in this pull request?
    This PR is a follow-up of https://github.com/apache/spark/pull/37268 which 
supports stage level task resource profile for standalone cluster when dynamic 
allocation disabled. This PR enables stage-level task resource profile for yarn 
cluster.
    
    ### Why are the changes needed?
    
    Users who work on spark ML/DL cases running on Yarn would expect 
stage-level task resource profile feature.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    The current tests of https://github.com/apache/spark/pull/37268 can also 
cover this PR since both yarn and standalone cluster share the same 
TaskSchedulerImpl class which implements this feature. Apart from that, 
modifying the existing test to cover yarn cluster. Apart from that, I also 
performed some manual tests which have been updated in the comments.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43030 from wbo4958/yarn-task-resoure-profile.
    
    Authored-by: Bobby Wang <wbo4...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../apache/spark/resource/ResourceProfileManager.scala    |  6 +++---
 .../spark/resource/ResourceProfileManagerSuite.scala      | 15 +++++++++++++--
 docs/configuration.md                                     |  2 +-
 docs/running-on-yarn.md                                   |  6 +++++-
 4 files changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 9f98d4d9c9c..cd7124a5724 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -67,9 +67,9 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
     if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
-      if ((notRunningUnitTests || testExceptionThrown) && 
!isStandaloneOrLocalCluster) {
-        throw new SparkException("TaskResourceProfiles are only supported for 
Standalone " +
-          "cluster for now when dynamic allocation is disabled.")
+      if ((notRunningUnitTests || testExceptionThrown) && 
!(isStandaloneOrLocalCluster || isYarn)) {
+        throw new SparkException("TaskResourceProfiles are only supported for 
Standalone and " +
+          "Yarn cluster for now when dynamic allocation is disabled.")
       }
     } else {
       val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index e97d5c7883a..77dc7bcb4c5 100644
--- 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -126,18 +126,29 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     val defaultProf = rpmanager.defaultResourceProfile
     assert(rpmanager.isSupported(defaultProf))
 
-    // task resource profile.
+    // Standalone: supports task resource profile.
     val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
     val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
     assert(rpmanager.isSupported(taskProf))
 
+    // Local: doesn't support task resource profile.
     conf.setMaster("local")
     rpmanager = new ResourceProfileManager(conf, listenerBus)
     val error = intercept[SparkException] {
       rpmanager.isSupported(taskProf)
     }.getMessage
     assert(error === "TaskResourceProfiles are only supported for Standalone " 
+
-      "cluster for now when dynamic allocation is disabled.")
+      "and Yarn cluster for now when dynamic allocation is disabled.")
+
+    // Local cluster: supports task resource profile.
+    conf.setMaster("local-cluster[1, 1, 1024]")
+    rpmanager = new ResourceProfileManager(conf, listenerBus)
+    assert(rpmanager.isSupported(taskProf))
+
+    // Yarn: supports task resource profile.
+    conf.setMaster("yarn")
+    rpmanager = new ResourceProfileManager(conf, listenerBus)
+    assert(rpmanager.isSupported(taskProf))
   }
 
   test("isSupported task resource profiles with dynamic allocation enabled") {
diff --git a/docs/configuration.md b/docs/configuration.md
index 39e02f36af0..e46ead8f2fc 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3668,7 +3668,7 @@ See your cluster manager specific page for requirements 
and details on each of -
 # Stage Level Scheduling Overview
 
 The stage level scheduling feature allows users to specify task and executor 
resource requirements at the stage level. This allows for different stages to 
run with executors that have different resources. A prime example of this is 
one ETL stage runs with executors with just CPUs, the next stage is an ML stage 
that needs GPUs. Stage level scheduling allows for user to request different 
executors that have GPUs when the ML stage runs rather then having to acquire 
executors with GPUs at th [...]
-This is only available for the RDD API in Scala, Java, and Python.  It is 
available on YARN, Kubernetes and Standalone when dynamic allocation is 
enabled. When dynamic allocation is disabled, it allows users to specify 
different task resource requirements at stage level, and this is supported on 
Standalone cluster right now. See the 
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or 
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page 
or [Standa [...]
+This is only available for the RDD API in Scala, Java, and Python.  It is 
available on YARN, Kubernetes and Standalone when dynamic allocation is 
enabled. When dynamic allocation is disabled, it allows users to specify 
different task resource requirements at stage level, and this is supported on 
YARN and Standalone cluster right now. See the 
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or 
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page o 
[...]
 
 See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this 
feature. When dynamic allocation is disabled, tasks with different task 
resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. 
While when dynamic allocation is enabled, the current implementation acquires 
new executors for each `ResourceProfile`  created and currently has to be an 
exact match. Spark does not try to fit tasks into an executor that require a 
different ResourceProfile than the [...]
 
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 709cffda9b0..c9a0af56a5a 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -759,7 +759,11 @@ YARN does not tell Spark the addresses of the resources 
allocated to each contai
 
 # Stage Level Scheduling Overview
 
-Stage level scheduling is supported on YARN when dynamic allocation is 
enabled. One thing to note that is YARN specific is that each ResourceProfile 
requires a different container priority on YARN. The mapping is simply the 
ResourceProfile id becomes the priority, on YARN lower numbers are higher 
priority. This means that profiles created earlier will have a higher priority 
in YARN. Normally this won't matter as Spark finishes one stage before starting 
another one, the only case this mig [...]
+Stage level scheduling is supported on YARN:
+- When dynamic allocation is disabled: It allows users to specify different 
task resource requirements at the stage level and will use the same executors 
requested at startup.
+- When dynamic allocation is enabled: It allows users to specify task and 
executor resource requirements at the stage level and will request the extra 
executors.
+
+One thing to note that is YARN specific is that each ResourceProfile requires 
a different container priority on YARN. The mapping is simply the 
ResourceProfile id becomes the priority, on YARN lower numbers are higher 
priority. This means that profiles created earlier will have a higher priority 
in YARN. Normally this won't matter as Spark finishes one stage before starting 
another one, the only case this might have an affect is in a job server type 
scenario, so its something to keep in mind.
 Note there is a difference in the way custom resources are handled between the 
base default profile and custom ResourceProfiles. To allow for the user to 
request YARN containers with extra resources without Spark scheduling on them, 
the user can specify resources via the 
<code>spark.yarn.executor.resource.</code> config. Those configs are only used 
in the base default profile though and do not get propagated into any other 
custom ResourceProfiles. This is because there would be no way to [...]
 
 # Important notes


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to