This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 109b1e4a79d [SPARK-46346][CORE] Fix Master to update a worker from `UNKNOWN` to `ALIVE` on `RegisterWorker` msg 109b1e4a79d is described below commit 109b1e4a79d9a5ec33944887a34c92d453016902 Author: Dongjoon Hyun <dh...@apple.com> AuthorDate: Sun Dec 10 11:19:46 2023 -0800 [SPARK-46346][CORE] Fix Master to update a worker from `UNKNOWN` to `ALIVE` on `RegisterWorker` msg ### What changes were proposed in this pull request? This PR aims to fix `Spark Master`'s recovery process to update a worker status from `UNKNOWN` to `ALIVE` when it receives a `RegisterWroker` message from that worker. ### Why are the changes needed? This only happens during the recovery. - `Master` already has the recovered worker information in memory with `UNKNOWN` status. - `Worker` sends `RegisterWorker` message correctly. - `Master` keeps its worker status in `UNKNOWN` and informs the worker with `RegisteredWorker` message with `duplicated` flag. - Since `Worker` received like the following and will not try to reconnect. ``` 23/12/09 23:49:57 INFO Worker: Retrying connection to master (attempt # 3) 23/12/09 23:49:57 INFO Worker: Connecting to master ...:7077... 23/12/09 23:50:04 INFO TransportClientFactory: Successfully created connection to master...:7077 after 7089 ms (0 ms spent in bootstraps) 23/12/09 23:50:04 WARN Worker: Duplicate registration at master spark://... 23/12/09 23:50:04 INFO Worker: Successfully registered with master spark://... ``` The `UNKNOWN`-status workers blocks the recovery process and causes a long delay. https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L604-L606 After the delay, master simply kills them all. https://github.com/apache/spark/blob/bac3492980a3e793065a9e9d511ddf0fb66357b3/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L647-L649 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This case is a little hard to make a unit test. Manually test. - Master ``` 23/12/10 04:58:30 WARN OneWayOutboxMessage: Failed to send one-way RPC. java.io.IOException: Connecting to /***:1024 timed out (10000 ms) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:291) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:214) at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:226) at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202) at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) 23/12/10 04:58:54 INFO Master: Registering worker ***:1024 with 2 cores, 23.0 GiB RAM 23/12/10 04:58:54 INFO Master: Worker has been re-registered: worker-20231210045613-***-1024 ``` - Worker ``` 23/12/10 04:58:45 INFO Worker: Retrying connection to master (attempt # 5) 23/12/10 04:58:45 INFO Worker: Connecting to master master:7077... 23/12/10 04:58:54 INFO TransportClientFactory: Successfully created connection to master/...:7077 after 63957 ms (0 ms spent in bootstraps) 23/12/10 04:58:54 WARN Worker: Duplicate registration at master spark://master:7077 23/12/10 04:58:54 INFO Worker: Successfully registered with master spark://master:7077 23/12/10 04:58:54 INFO Worker: WorkerWebUI is available at https://...-1***-1024 23/12/10 04:58:54 INFO Worker: Worker cleanup enabled; old application directories will be deleted in: /data/spark ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #44280 from dongjoon-hyun/SPARK-46346. Authored-by: Dongjoon Hyun <dh...@apple.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7346c80aff4..a550f44fc0a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -286,6 +286,10 @@ private[deploy] class Master( if (state == RecoveryState.STANDBY) { workerRef.send(MasterInStandby) } else if (idToWorker.contains(id)) { + if (idToWorker(id).state == WorkerState.UNKNOWN) { + logInfo("Worker has been re-registered: " + id) + idToWorker(id).state = WorkerState.ALIVE + } workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress, true)) } else { val workerResources = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org