This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new dab865b68 [CELEBORN-662] Report worker unavailable regardless graceful
shutdown
dab865b68 is described below
commit dab865b68ba105fde7c99d8b266960481dabc2a5
Author: zky.zhoukeyong <[email protected]>
AuthorDate: Sat Jun 10 18:36:25 2023 +0800
[CELEBORN-662] Report worker unavailable regardless graceful shutdown
### What changes were proposed in this pull request?
In this PR, worker always report node unavailable regardless graceful
shutdown is turned on or off.
### Why are the changes needed?
To inform master the shutting down worker as soon as possible.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #1575 from waitinfuture/662.
Authored-by: zky.zhoukeyong <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/service/deploy/worker/Worker.scala | 27 +++++++++++-----------
1 file changed, 13 insertions(+), 14 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
index 9a3b050ef..55cbf737a 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
@@ -534,22 +534,21 @@ private[celeborn] class Worker(
new Thread(new Runnable {
override def run(): Unit = {
logInfo("Shutdown hook called.")
+ // During shutdown, to avoid allocate slots in this worker,
+ // add this worker to master's blacklist. When restart, register
worker will
+ // make master remove this worker from blacklist.
+ try {
+ rssHARetryClient.askSync(
+ ReportWorkerUnavailable(List(workerInfo).asJava),
+ OneWayMessageResponse.getClass)
+ } catch {
+ case e: Throwable =>
+ logError(
+ s"Fail report to master, need wait PartitionLocation auto
release: \n$partitionLocationInfo",
+ e)
+ }
shutdown.set(true)
if (gracefulShutdown) {
- // During graceful shutdown, to avoid allocate slots in this worker,
- // add this worker to master's blacklist. When restart, register
worker will
- // make master remove this worker from blacklist.
- try {
- rssHARetryClient.askSync(
- ReportWorkerUnavailable(List(workerInfo).asJava),
- OneWayMessageResponse.getClass)
- } catch {
- case e: Throwable =>
- logError(
- s"Fail report to master, need wait PartitionLocation auto
release: \n$partitionLocationInfo",
- e)
- }
-
val interval = conf.workerGracefulShutdownCheckSlotsFinishedInterval
val timeout = conf.workerGracefulShutdownCheckSlotsFinishedTimeoutMs
var waitTimes = 0