This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 973e31eb2 [CELEBORN-1568] Support worker retries in MiniCluster
973e31eb2 is described below

commit 973e31eb20c7eefc17bdc2c31a938b85f5127eb3
Author: sychen <[email protected]>
AuthorDate: Tue Aug 20 10:46:21 2024 +0800

    [CELEBORN-1568] Support worker retries in MiniCluster
    
    ### What changes were proposed in this pull request?
    
    ### Why are the changes needed?
    
https://github.com/apache/celeborn/actions/runs/10417785546/job/28852691241#step:4:5804
    
    Now the worker retry logic, the first time of sleep is 2000, the second 
time is 4000000, and the third time is 8000000000 milliseconds. It is estimated 
that it will be difficult to complete the retry.
    
    ```scala
    Thread.sleep(math.pow(2000, workerStartRetry).toInt)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    GA
    
    Closes #2692 from cxzl25/CELEBORN-1568.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../org/apache/celeborn/service/deploy/MiniClusterFeature.scala    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index a87015d21..1f20b71b6 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy
 import java.io.IOException
 import java.net.BindException
 import java.nio.file.Files
+import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.{Lock, ReentrantLock}
 
 import scala.collection.mutable
@@ -202,12 +203,12 @@ trait MiniClusterFeature extends Logging {
                 logError(s"cannot start worker $i, reached to max retrying", 
ex)
                 throw ex
               } else {
-                Thread.sleep(math.pow(2000, workerStartRetry).toInt)
+                TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)
               }
           }
         }
       })
-      workerThread.setName(s"worker ${i} starter thread")
+      workerThread.setName(s"worker $i starter thread")
       workerThread
     }
     threads.foreach(_.start())
@@ -228,7 +229,7 @@ trait MiniClusterFeature extends Logging {
         workerInfos.foreach { case (worker, _) => 
assert(worker.registered.get()) }
         allWorkersStarted = true
       } catch {
-        case ex: Exception =>
+        case ex: Throwable =>
           logError("all workers haven't been started retrying", ex)
           Thread.sleep(5000)
           workersWaitingTime += 5000

Reply via email to