This is an automated email from the ASF dual-hosted git repository. sarutak pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 0af666a [SPARK-36509][CORE] Fix the issue that executors are never re-scheduled if the worker stops with standalone cluster 0af666a is described below commit 0af666a310590367a80439000d74975526064c87 Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Sat Aug 28 18:01:55 2021 +0900 [SPARK-36509][CORE] Fix the issue that executors are never re-scheduled if the worker stops with standalone cluster ### What changes were proposed in this pull request? This PR fixes an issue that executors are never re-scheduled if the worker which the executors run on stops. As a result, the application stucks. You can easily reproduce this issue by the following procedures. ``` # Run master $ sbin/start-master.sh # Run worker 1 $ SPARK_LOG_DIR=/tmp/worker1 SPARK_PID_DIR=/tmp/worker1/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker1 --webui-port 8081 spark://<hostname>:7077 # Run worker 2 $ SPARK_LOG_DIR=/tmp/worker2 SPARK_PID_DIR=/tmp/worker2/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker2 --webui-port 8082 spark://<hostname>:7077 # Run Spark Shell $ bin/spark-shell --master spark://<hostname>:7077 --executor-cores 1 --total-executor-cores 1 # Check which worker the executor runs on and then kill the worker. $ kill <worker pid> ``` With the procedure above, we will expect that the executor is re-scheduled on the other worker but it won't. The reason seems that `Master.schedule` cannot be called after the worker is marked as `WorkerState.DEAD`. So, the solution this PR proposes is to call `Master.schedule` whenever `Master.removeWorker` is called. This PR also fixes an issue that `ExecutorRunner` can send `ExecutorStateChanged` message without changing its state. This issue causes assertion error. ``` 2021-08-13 14:05:37,991 [dispatcher-event-loop-9] ERROR: Ignoring errorjava.lang.AssertionError: assertion failed: executor 0 state transfer from RUNNING to RUNNING is illegal ``` ### Why are the changes needed? It's a critical bug. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually tested with the procedure shown above and confirmed the executor is re-scheduled. Closes #33818 from sarutak/fix-scheduling-stuck. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Kousuke Saruta <saru...@oss.nttdata.com> (cherry picked from commit ea8c31e5ea233da4407f6821b2d6dd7f3c88f8d9) Signed-off-by: Kousuke Saruta <saru...@oss.nttdata.com> --- core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 1 + core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9f1b36a..1cbeacf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -964,6 +964,7 @@ private[deploy] class Master( app.driver.send(WorkerRemoved(worker.id, worker.host, msg)) } persistenceEngine.removeWorker(worker) + schedule() } private def relaunchDriver(driver: DriverInfo): Unit = { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 974c2d6..40d9407 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -83,7 +83,7 @@ private[deploy] class ExecutorRunner( shutdownHook = ShutdownHookManager.addShutdownHook { () => // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`. - if (state == ExecutorState.LAUNCHING) { + if (state == ExecutorState.LAUNCHING || state == ExecutorState.RUNNING) { state = ExecutorState.FAILED } killProcess(Some("Worker shutting down")) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org