This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3fe5614 [SPARK-31946][CORE] Make worker/executor decommission signal configurable 3fe5614 is described below commit 3fe5614a7cc8f5b65a90924e9a4a535fcaf76a98 Author: yi.wu <yi...@databricks.com> AuthorDate: Thu Dec 31 13:13:02 2020 -0800 [SPARK-31946][CORE] Make worker/executor decommission signal configurable ### What changes were proposed in this pull request? This PR proposed to make worker/executor decommission signal configurable. * Added confs: `spark.worker.decommission.signal` / `spark.executor.decommission.signal` * Rename `WorkerSigPWRReceived`/ `ExecutorSigPWRReceived` to `WorkerDecomSigReceived`/ `ExecutorDecomSigReceived` ### Why are the changes needed? The current signal `PWR` can't work on macOS since it's not compliant with POSIX while macOS does. So the developers currently can't do end-to-end decommission test on their macOS environment. Besides, the configuration becomes more flexible for users in case the default signal (`PWR`) gets conflicted with their own applications/environment. ### Does this PR introduce _any_ user-facing change? No (it's a new API for 3.2) ### How was this patch tested? Manually tested. Closes #30968 from Ngone51/configurable-decom-signal. Authored-by: yi.wu <yi...@databricks.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../main/scala/org/apache/spark/deploy/DeployMessage.scala | 4 ++-- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 13 +++++++------ .../spark/executor/CoarseGrainedExecutorBackend.scala | 9 +++++---- .../scala/org/apache/spark/internal/config/Worker.scala | 7 +++++++ .../scala/org/apache/spark/internal/config/package.scala | 7 +++++++ .../scheduler/cluster/CoarseGrainedClusterMessage.scala | 4 ++-- 6 files changed, 30 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index d5b5375..727cdbc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -77,10 +77,10 @@ private[deploy] object DeployMessages { object DecommissionWorker extends DeployMessage /** - * A message that sent by the Worker to itself when it receives PWR signal, + * A message that sent by the Worker to itself when it receives a signal, * indicating the Worker starts to decommission. */ - object WorkerSigPWRReceived extends DeployMessage + object WorkerDecommissionSigReceived extends DeployMessage /** * A message sent from Worker to Master to tell Master that the Worker has started diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index a6092f6..a3c7375 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -66,16 +66,17 @@ private[deploy] class Worker( Utils.checkHost(host) assert (port > 0) - // If worker decommissioning is enabled register a handler on PWR to shutdown. + // If worker decommissioning is enabled register a handler on the configured signal to shutdown. if (conf.get(config.DECOMMISSION_ENABLED)) { - logInfo("Registering SIGPWR handler to trigger decommissioning.") - SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + + val signal = conf.get(config.Worker.WORKER_DECOMMISSION_SIGNAL) + logInfo(s"Registering SIG$signal handler to trigger decommissioning.") + SignalUtils.register(signal, s"Failed to register SIG$signal handler - " + "disabling worker decommission feature.") { - self.send(WorkerSigPWRReceived) + self.send(WorkerDecommissionSigReceived) true } } else { - logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.") + logInfo("Worker decommissioning not enabled.") } // A scheduled executor used to send messages at the specified time. @@ -682,7 +683,7 @@ private[deploy] class Worker( case DecommissionWorker => decommissionSelf() - case WorkerSigPWRReceived => + case WorkerDecommissionSigReceived => decommissionSelf() // Tell the Master that we are starting decommissioning // so it stops trying to launch executor/driver on us diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6a1fd57..e1d3009 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -82,9 +82,10 @@ private[spark] class CoarseGrainedExecutorBackend( override def onStart(): Unit = { if (env.conf.get(DECOMMISSION_ENABLED)) { - logInfo("Registering PWR handler to trigger decommissioning.") - SignalUtils.register("PWR", "Failed to register SIGPWR handler - " + - "disabling executor decommission feature.") (self.askSync[Boolean](ExecutorSigPWRReceived)) + val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL) + logInfo(s"Registering SIG$signal handler to trigger decommissioning.") + SignalUtils.register(signal, s"Failed to register SIG$signal handler - disabling" + + s" executor decommission feature.") (self.askSync[Boolean](ExecutorDecommissionSigReceived)) } logInfo("Connecting to driver: " + driverUrl) @@ -208,7 +209,7 @@ private[spark] class CoarseGrainedExecutorBackend( } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case ExecutorSigPWRReceived => + case ExecutorDecommissionSigReceived => var driverNotified = false try { driver.foreach { driverRef => diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala index a807271..fda3a57 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala @@ -82,4 +82,11 @@ private[spark] object Worker { .version("2.0.2") .intConf .createWithDefault(100) + + val WORKER_DECOMMISSION_SIGNAL = + ConfigBuilder("spark.worker.decommission.signal") + .doc("The signal that used to trigger the worker to start decommission.") + .version("3.2.0") + .stringConf + .createWithDefaultString("PWR") } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index cbf4a97..adaf92d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1927,6 +1927,13 @@ package object config { .timeConf(TimeUnit.SECONDS) .createOptional + private[spark] val EXECUTOR_DECOMMISSION_SIGNAL = + ConfigBuilder("spark.executor.decommission.signal") + .doc("The signal that used to trigger the executor to start decommission.") + .version("3.2.0") + .stringConf + .createWithDefaultString("PWR") + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") .doc("Staging directory used while submitting applications.") .version("2.0.0") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index e084453..2f17143 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -102,9 +102,9 @@ private[spark] object CoarseGrainedClusterMessages { // It's used for Standalone's cases, where decommission is triggered at MasterWebUI or Worker. object DecommissionExecutor extends CoarseGrainedClusterMessage - // A message that sent to the executor itself when it receives PWR signal, + // A message that sent to the executor itself when it receives a signal, // indicating the executor starts to decommission. - object ExecutorSigPWRReceived extends CoarseGrainedClusterMessage + object ExecutorDecommissionSigReceived extends CoarseGrainedClusterMessage case class RemoveWorker(workerId: String, host: String, message: String) extends CoarseGrainedClusterMessage --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org