This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 1f81e26e038 [SPARK-46006][YARN] YarnAllocator miss clean 
targetNumExecutorsPerResourceProfileId after YarnSchedulerBackend call stop
1f81e26e038 is described below

commit 1f81e26e03803238ee6292762bcbee49e1a7c066
Author: Angerszhuuuu <angers....@gmail.com>
AuthorDate: Wed Nov 22 16:50:21 2023 +0800

    [SPARK-46006][YARN] YarnAllocator miss clean 
targetNumExecutorsPerResourceProfileId after YarnSchedulerBackend call stop
    
    ### What changes were proposed in this pull request?
    We meet a case that user call sc.stop() after run all custom code, but 
stuck in some place.
    
    Cause below situation
    
    1. User call sc.stop()
    2. sc.stop() stuck in some process, but SchedulerBackend.stop was called
    3. Since yarn ApplicationMaster didn't finish, still call 
YarnAllocator.allocateResources()
    4. Since driver endpoint stop new allocated executor failed to register
    5. untll trigger Max number of executor failures
    6. Caused by
    
    Before call CoarseGrainedSchedulerBackend.stop() will call 
YarnSchedulerBackend.requestTotalExecutor() to clean request info
    
![image](https://github.com/apache/spark/assets/46485123/4a61fb40-5986-4ecc-9329-369187d5311d)
    
    When YarnAllocator handle then empty resource request,  since 
resourceTotalExecutorsWithPreferedLocalities is empty, miss clean 
targetNumExecutorsPerResourceProfileId.
    
![image](https://github.com/apache/spark/assets/46485123/0133f606-e1d7-4db7-95fe-140c61379102)
    
    ### Why are the changes needed?
    Fix bug
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43906 from AngersZhuuuu/SPARK-46006.
    
    Authored-by: Angerszhuuuu <angers....@gmail.com>
    Signed-off-by: Kent Yao <y...@apache.org>
    (cherry picked from commit 06635e25f170e61f6cfe53232d001993ec7d376d)
    Signed-off-by: Kent Yao <y...@apache.org>
---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 28 +++++++++++++---------
 1 file changed, 17 insertions(+), 11 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 19c06f95731..f14fc9d5de4 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -384,19 +384,25 @@ private[yarn] class YarnAllocator(
     this.numLocalityAwareTasksPerResourceProfileId = 
numLocalityAwareTasksPerResourceProfileId
     this.hostToLocalTaskCountPerResourceProfileId = 
hostToLocalTaskCountPerResourceProfileId
 
-    val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
-      createYarnResourceForResourceProfile(rp)
-      if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
-        logInfo(s"Driver requested a total number of $numExecs executor(s) " +
-          s"for resource profile id: ${rp.id}.")
-        targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
-        allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
-        true
-      } else {
-        false
+    if (resourceProfileToTotalExecs.isEmpty) {
+      targetNumExecutorsPerResourceProfileId.clear()
+      allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
+      true
+    } else {
+      val res = resourceProfileToTotalExecs.map { case (rp, numExecs) =>
+        createYarnResourceForResourceProfile(rp)
+        if (numExecs != getOrUpdateTargetNumExecutorsForRPId(rp.id)) {
+          logInfo(s"Driver requested a total number of $numExecs executor(s) " 
+
+            s"for resource profile id: ${rp.id}.")
+          targetNumExecutorsPerResourceProfileId(rp.id) = numExecs
+          allocatorNodeHealthTracker.setSchedulerExcludedNodes(excludedNodes)
+          true
+        } else {
+          false
+        }
       }
+      res.exists(_ == true)
     }
-    res.exists(_ == true)
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to