This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new e8d7497abad [SPARK-45495][CORE] Support stage level task resource 
profile for k8s cluster when dynamic allocation disabled
e8d7497abad is described below

commit e8d7497abadd0bccb4bd8e615aadc77fc2038566
Author: Bobby Wang <wbo4...@gmail.com>
AuthorDate: Fri Oct 13 10:50:18 2023 -0500

    [SPARK-45495][CORE] Support stage level task resource profile for k8s 
cluster when dynamic allocation disabled
    
    ### What changes were proposed in this pull request?
    This PR is a follow-up of https://github.com/apache/spark/pull/37268 which 
supports stage-level task resource profile for standalone cluster when dynamic 
allocation is disabled. This PR enables stage-level task resource profile for 
the Kubernetes cluster.
    
    ### Why are the changes needed?
    
    Users who work on spark ML/DL cases running on Kubernetes would expect 
stage-level task resource profile feature.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    The current tests of https://github.com/apache/spark/pull/37268 can also 
cover this PR since both Kubernetes and standalone cluster share the same 
TaskSchedulerImpl class which implements this feature. Apart from that, 
modifying the existing test to cover the Kubernetes cluster. Apart from that, I 
also performed some manual tests which have been updated in the comments.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43323 from wbo4958/k8s-stage-level.
    
    Authored-by: Bobby Wang <wbo4...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
    (cherry picked from commit 632eabdb6dfb78c0a5dc84c01806548e1dc6dd0a)
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../scala/org/apache/spark/resource/ResourceProfileManager.scala | 7 ++++---
 .../org/apache/spark/resource/ResourceProfileManagerSuite.scala  | 9 +++++++--
 docs/configuration.md                                            | 2 +-
 docs/running-on-kubernetes.md                                    | 4 +++-
 4 files changed, 15 insertions(+), 7 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index cd7124a5724..afbacb80136 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -67,9 +67,10 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
    */
   private[spark] def isSupported(rp: ResourceProfile): Boolean = {
     if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
-      if ((notRunningUnitTests || testExceptionThrown) && 
!(isStandaloneOrLocalCluster || isYarn)) {
-        throw new SparkException("TaskResourceProfiles are only supported for 
Standalone and " +
-          "Yarn cluster for now when dynamic allocation is disabled.")
+      if ((notRunningUnitTests || testExceptionThrown) &&
+        !(isStandaloneOrLocalCluster || isYarn || isK8s)) {
+        throw new SparkException("TaskResourceProfiles are only supported for 
Standalone, " +
+          "Yarn and Kubernetes cluster for now when dynamic allocation is 
disabled.")
       }
     } else {
       val isNotDefaultProfile = rp.id != 
ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
diff --git 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index 77dc7bcb4c5..7149267583b 100644
--- 
a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -137,8 +137,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     val error = intercept[SparkException] {
       rpmanager.isSupported(taskProf)
     }.getMessage
-    assert(error === "TaskResourceProfiles are only supported for Standalone " 
+
-      "and Yarn cluster for now when dynamic allocation is disabled.")
+    assert(error === "TaskResourceProfiles are only supported for Standalone, 
" +
+      "Yarn and Kubernetes cluster for now when dynamic allocation is 
disabled.")
 
     // Local cluster: supports task resource profile.
     conf.setMaster("local-cluster[1, 1, 1024]")
@@ -149,6 +149,11 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
     conf.setMaster("yarn")
     rpmanager = new ResourceProfileManager(conf, listenerBus)
     assert(rpmanager.isSupported(taskProf))
+
+    // K8s: supports task resource profile.
+    conf.setMaster("k8s://foo")
+    rpmanager = new ResourceProfileManager(conf, listenerBus)
+    assert(rpmanager.isSupported(taskProf))
   }
 
   test("isSupported task resource profiles with dynamic allocation enabled") {
diff --git a/docs/configuration.md b/docs/configuration.md
index 74ddd6df023..4b0b9b3e3c2 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3670,7 +3670,7 @@ See your cluster manager specific page for requirements 
and details on each of -
 # Stage Level Scheduling Overview
 
 The stage level scheduling feature allows users to specify task and executor 
resource requirements at the stage level. This allows for different stages to 
run with executors that have different resources. A prime example of this is 
one ETL stage runs with executors with just CPUs, the next stage is an ML stage 
that needs GPUs. Stage level scheduling allows for user to request different 
executors that have GPUs when the ML stage runs rather then having to acquire 
executors with GPUs at th [...]
-This is only available for the RDD API in Scala, Java, and Python.  It is 
available on YARN, Kubernetes and Standalone when dynamic allocation is 
enabled. When dynamic allocation is disabled, it allows users to specify 
different task resource requirements at stage level, and this is supported on 
YARN and Standalone cluster right now. See the 
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or 
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page o 
[...]
+This is only available for the RDD API in Scala, Java, and Python.  It is 
available on YARN, Kubernetes and Standalone when dynamic allocation is 
enabled. When dynamic allocation is disabled, it allows users to specify 
different task resource requirements at stage level, and this is supported on 
YARN, Kubernetes and Standalone cluster right now. See the 
[YARN](running-on-yarn.html#stage-level-scheduling-overview) page or 
[Kubernetes](running-on-kubernetes.html#stage-level-scheduling-over [...]
 
 See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this 
feature. When dynamic allocation is disabled, tasks with different task 
resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. 
While when dynamic allocation is enabled, the current implementation acquires 
new executors for each `ResourceProfile`  created and currently has to be an 
exact match. Spark does not try to fit tasks into an executor that require a 
different ResourceProfile than the [...]
 
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 707a76196f3..38a745f1afc 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1936,5 +1936,7 @@ With the above configuration, the job will be scheduled 
by YuniKorn scheduler in
 
 ### Stage Level Scheduling Overview
 
-Stage level scheduling is supported on Kubernetes when dynamic allocation is 
enabled. This also requires 
<code>spark.dynamicAllocation.shuffleTracking.enabled</code> to be enabled 
since Kubernetes doesn't support an external shuffle service at this time. The 
order in which containers for different profiles is requested from Kubernetes 
is not guaranteed. Note that since dynamic allocation on Kubernetes requires 
the shuffle tracking feature, this means that executors from previous stages t 
[...]
+Stage level scheduling is supported on Kubernetes:
+- When dynamic allocation is disabled: It allows users to specify different 
task resource requirements at the stage level and will use the same executors 
requested at startup.
+- When dynamic allocation is enabled: It allows users to specify task and 
executor resource requirements at the stage level and will request the extra 
executors. This also requires 
<code>spark.dynamicAllocation.shuffleTracking.enabled</code> to be enabled 
since Kubernetes doesn't support an external shuffle service at this time. The 
order in which containers for different profiles is requested from Kubernetes 
is not guaranteed. Note that since dynamic allocation on Kubernetes requires th 
[...]
 Note, there is a difference in the way pod template resources are handled 
between the base default profile and custom ResourceProfiles. Any resources 
specified in the pod template file will only be used with the base default 
profile. If you create custom ResourceProfiles be sure to include all necessary 
resources there since the resources from the template file will not be 
propagated to custom ResourceProfiles.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to