This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3fe5614  [SPARK-31946][CORE] Make worker/executor decommission signal 
configurable
3fe5614 is described below

commit 3fe5614a7cc8f5b65a90924e9a4a535fcaf76a98
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Dec 31 13:13:02 2020 -0800

    [SPARK-31946][CORE] Make worker/executor decommission signal configurable
    
    ### What changes were proposed in this pull request?
    
    This PR proposed to make worker/executor decommission signal configurable.
    
    * Added confs: `spark.worker.decommission.signal` / 
`spark.executor.decommission.signal`
    * Rename `WorkerSigPWRReceived`/ `ExecutorSigPWRReceived` to 
`WorkerDecomSigReceived`/ `ExecutorDecomSigReceived`
    
    ### Why are the changes needed?
    
    The current signal `PWR` can't work on macOS since it's not compliant with 
POSIX while macOS does.  So the developers currently can't do end-to-end 
decommission test on their macOS environment.
    
    Besides, the configuration becomes more flexible for users in case the 
default signal (`PWR`) gets conflicted with their own applications/environment.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No (it's a new API for 3.2)
    
    ### How was this patch tested?
    
    Manually tested.
    
    Closes #30968 from Ngone51/configurable-decom-signal.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../main/scala/org/apache/spark/deploy/DeployMessage.scala  |  4 ++--
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala  | 13 +++++++------
 .../spark/executor/CoarseGrainedExecutorBackend.scala       |  9 +++++----
 .../scala/org/apache/spark/internal/config/Worker.scala     |  7 +++++++
 .../scala/org/apache/spark/internal/config/package.scala    |  7 +++++++
 .../scheduler/cluster/CoarseGrainedClusterMessage.scala     |  4 ++--
 6 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index d5b5375..727cdbc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -77,10 +77,10 @@ private[deploy] object DeployMessages {
   object DecommissionWorker extends DeployMessage
 
   /**
-   * A message that sent by the Worker to itself when it receives PWR signal,
+   * A message that sent by the Worker to itself when it receives a signal,
    * indicating the Worker starts to decommission.
    */
-  object WorkerSigPWRReceived extends DeployMessage
+  object WorkerDecommissionSigReceived extends DeployMessage
 
   /**
    * A message sent from Worker to Master to tell Master that the Worker has 
started
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a6092f6..a3c7375 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -66,16 +66,17 @@ private[deploy] class Worker(
   Utils.checkHost(host)
   assert (port > 0)
 
-  // If worker decommissioning is enabled register a handler on PWR to 
shutdown.
+  // If worker decommissioning is enabled register a handler on the configured 
signal to shutdown.
   if (conf.get(config.DECOMMISSION_ENABLED)) {
-    logInfo("Registering SIGPWR handler to trigger decommissioning.")
-    SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
+    val signal = conf.get(config.Worker.WORKER_DECOMMISSION_SIGNAL)
+    logInfo(s"Registering SIG$signal handler to trigger decommissioning.")
+    SignalUtils.register(signal, s"Failed to register SIG$signal handler - " +
       "disabling worker decommission feature.") {
-       self.send(WorkerSigPWRReceived)
+       self.send(WorkerDecommissionSigReceived)
        true
     }
   } else {
-    logInfo("Worker decommissioning not enabled, SIGPWR will result in 
exiting.")
+    logInfo("Worker decommissioning not enabled.")
   }
 
   // A scheduled executor used to send messages at the specified time.
@@ -682,7 +683,7 @@ private[deploy] class Worker(
     case DecommissionWorker =>
       decommissionSelf()
 
-    case WorkerSigPWRReceived =>
+    case WorkerDecommissionSigReceived =>
       decommissionSelf()
       // Tell the Master that we are starting decommissioning
       // so it stops trying to launch executor/driver on us
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 6a1fd57..e1d3009 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -82,9 +82,10 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   override def onStart(): Unit = {
     if (env.conf.get(DECOMMISSION_ENABLED)) {
-      logInfo("Registering PWR handler to trigger decommissioning.")
-      SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
-        "disabling executor decommission feature.") 
(self.askSync[Boolean](ExecutorSigPWRReceived))
+      val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
+      logInfo(s"Registering SIG$signal handler to trigger decommissioning.")
+      SignalUtils.register(signal, s"Failed to register SIG$signal handler - 
disabling" +
+        s" executor decommission feature.") 
(self.askSync[Boolean](ExecutorDecommissionSigReceived))
     }
 
     logInfo("Connecting to driver: " + driverUrl)
@@ -208,7 +209,7 @@ private[spark] class CoarseGrainedExecutorBackend(
   }
 
   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
-    case ExecutorSigPWRReceived =>
+    case ExecutorDecommissionSigReceived =>
       var driverNotified = false
       try {
         driver.foreach { driverRef =>
diff --git a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala 
b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
index a807271..fda3a57 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/Worker.scala
@@ -82,4 +82,11 @@ private[spark] object Worker {
       .version("2.0.2")
       .intConf
       .createWithDefault(100)
+
+  val WORKER_DECOMMISSION_SIGNAL =
+    ConfigBuilder("spark.worker.decommission.signal")
+      .doc("The signal that used to trigger the worker to start decommission.")
+      .version("3.2.0")
+      .stringConf
+      .createWithDefaultString("PWR")
 }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index cbf4a97..adaf92d 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1927,6 +1927,13 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createOptional
 
+  private[spark] val EXECUTOR_DECOMMISSION_SIGNAL =
+    ConfigBuilder("spark.executor.decommission.signal")
+      .doc("The signal that used to trigger the executor to start 
decommission.")
+      .version("3.2.0")
+      .stringConf
+      .createWithDefaultString("PWR")
+
   private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
     .doc("Staging directory used while submitting applications.")
     .version("2.0.0")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index e084453..2f17143 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -102,9 +102,9 @@ private[spark] object CoarseGrainedClusterMessages {
   // It's used for Standalone's cases, where decommission is triggered at 
MasterWebUI or Worker.
   object DecommissionExecutor extends CoarseGrainedClusterMessage
 
-  // A message that sent to the executor itself when it receives PWR signal,
+  // A message that sent to the executor itself when it receives a signal,
   // indicating the executor starts to decommission.
-  object ExecutorSigPWRReceived extends CoarseGrainedClusterMessage
+  object ExecutorDecommissionSigReceived extends CoarseGrainedClusterMessage
 
   case class RemoveWorker(workerId: String, host: String, message: String)
     extends CoarseGrainedClusterMessage


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to