This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 222ed267b [CELEBORN-692] WorkerStatusTracker should handle 
WORKER_SHUTDOWN properly
222ed267b is described below

commit 222ed267b0db779af60e6925a0f117fab8d7ea61
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Mon Jun 19 15:54:45 2023 +0800

    [CELEBORN-692] WorkerStatusTracker should handle WORKER_SHUTDOWN properly
    
    ### What changes were proposed in this pull request?
    This PR put workers with WORKER_SHUTDOWN status into shuttingWorkers 
instead of blacklist.
    
    ### Why are the changes needed?
    If WORKER_SHUTDOWN workers are put into blacklist, it will not trigger 
commit files, see ```CommitHandler::commitFiles```
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Manual test.
    
    Closes #1603 from waitinfuture/692.
    
    Authored-by: zky.zhoukeyong <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../celeborn/client/WorkerStatusTracker.scala      | 22 ++++++++++------------
 1 file changed, 10 insertions(+), 12 deletions(-)

diff --git 
a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala 
b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
index 602a2b5f0..99c558371 100644
--- a/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
+++ b/client/src/main/scala/org/apache/celeborn/client/WorkerStatusTracker.scala
@@ -122,19 +122,17 @@ class WorkerStatusTracker(
            |Current blacklist:
            |$blacklistMsg
                """.stripMargin)
-      failedWorker.asScala.foreach { case (worker, (statusCode, registerTime)) 
=>
-        if (!blacklist.containsKey(worker)) {
+      failedWorker.asScala.foreach {
+        case (worker, (StatusCode.WORKER_SHUTDOWN, _)) =>
+          shuttingWorkers.add(worker)
+        case (worker, (statusCode, registerTime)) if 
!blacklist.containsKey(worker) =>
           blacklist.put(worker, (statusCode, registerTime))
-        } else {
-          statusCode match {
-            case StatusCode.WORKER_SHUTDOWN |
-                StatusCode.NO_AVAILABLE_WORKING_DIR |
-                StatusCode.RESERVE_SLOTS_FAILED |
-                StatusCode.UNKNOWN_WORKER =>
-              blacklist.put(worker, (statusCode, blacklist.get(worker)._2))
-            case _ => // Not cover
-          }
-        }
+        case (worker, (statusCode, _))
+            if statusCode == StatusCode.NO_AVAILABLE_WORKING_DIR ||
+              statusCode == StatusCode.RESERVE_SLOTS_FAILED ||
+              statusCode == StatusCode.UNKNOWN_WORKER =>
+          blacklist.put(worker, (statusCode, blacklist.get(worker)._2))
+        case _ => // Not cover
       }
     }
   }

Reply via email to