This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new b5ac49f94 [CELEBORN-1568] Support worker retries in MiniCluster
b5ac49f94 is described below
commit b5ac49f94c1d968a0babaac66d72297c33c73f77
Author: sychen <[email protected]>
AuthorDate: Tue Aug 20 10:46:21 2024 +0800
[CELEBORN-1568] Support worker retries in MiniCluster
### What changes were proposed in this pull request?
### Why are the changes needed?
https://github.com/apache/celeborn/actions/runs/10417785546/job/28852691241#step:4:5804
Now the worker retry logic, the first time of sleep is 2000, the second
time is 4000000, and the third time is 8000000000 milliseconds. It is estimated
that it will be difficult to complete the retry.
```scala
Thread.sleep(math.pow(2000, workerStartRetry).toInt)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2692 from cxzl25/CELEBORN-1568.
Authored-by: sychen <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit 973e31eb20c7eefc17bdc2c31a938b85f5127eb3)
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/service/deploy/MiniClusterFeature.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index a87015d21..1f20b71b6 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy
import java.io.IOException
import java.net.BindException
import java.nio.file.Files
+import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.{Lock, ReentrantLock}
import scala.collection.mutable
@@ -202,12 +203,12 @@ trait MiniClusterFeature extends Logging {
logError(s"cannot start worker $i, reached to max retrying",
ex)
throw ex
} else {
- Thread.sleep(math.pow(2000, workerStartRetry).toInt)
+ TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)
}
}
}
})
- workerThread.setName(s"worker ${i} starter thread")
+ workerThread.setName(s"worker $i starter thread")
workerThread
}
threads.foreach(_.start())
@@ -228,7 +229,7 @@ trait MiniClusterFeature extends Logging {
workerInfos.foreach { case (worker, _) =>
assert(worker.registered.get()) }
allWorkersStarted = true
} catch {
- case ex: Exception =>
+ case ex: Throwable =>
logError("all workers haven't been started retrying", ex)
Thread.sleep(5000)
workersWaitingTime += 5000