This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 973e31eb2 [CELEBORN-1568] Support worker retries in MiniCluster
973e31eb2 is described below
commit 973e31eb20c7eefc17bdc2c31a938b85f5127eb3
Author: sychen <[email protected]>
AuthorDate: Tue Aug 20 10:46:21 2024 +0800
[CELEBORN-1568] Support worker retries in MiniCluster
### What changes were proposed in this pull request?
### Why are the changes needed?
https://github.com/apache/celeborn/actions/runs/10417785546/job/28852691241#step:4:5804
Now the worker retry logic, the first time of sleep is 2000, the second
time is 4000000, and the third time is 8000000000 milliseconds. It is estimated
that it will be difficult to complete the retry.
```scala
Thread.sleep(math.pow(2000, workerStartRetry).toInt)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
GA
Closes #2692 from cxzl25/CELEBORN-1568.
Authored-by: sychen <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../org/apache/celeborn/service/deploy/MiniClusterFeature.scala | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
index a87015d21..1f20b71b6 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/MiniClusterFeature.scala
@@ -20,6 +20,7 @@ package org.apache.celeborn.service.deploy
import java.io.IOException
import java.net.BindException
import java.nio.file.Files
+import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.{Lock, ReentrantLock}
import scala.collection.mutable
@@ -202,12 +203,12 @@ trait MiniClusterFeature extends Logging {
logError(s"cannot start worker $i, reached to max retrying",
ex)
throw ex
} else {
- Thread.sleep(math.pow(2000, workerStartRetry).toInt)
+ TimeUnit.SECONDS.sleep(Math.pow(2, workerStartRetry).toLong)
}
}
}
})
- workerThread.setName(s"worker ${i} starter thread")
+ workerThread.setName(s"worker $i starter thread")
workerThread
}
threads.foreach(_.start())
@@ -228,7 +229,7 @@ trait MiniClusterFeature extends Logging {
workerInfos.foreach { case (worker, _) =>
assert(worker.registered.get()) }
allWorkersStarted = true
} catch {
- case ex: Exception =>
+ case ex: Throwable =>
logError("all workers haven't been started retrying", ex)
Thread.sleep(5000)
workersWaitingTime += 5000