holdenk commented on a change in pull request #29817:
URL: https://github.com/apache/spark/pull/29817#discussion_r493940745



##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -61,13 +61,34 @@ private[deploy] object DeployMessages {
   }
 
   /**
+   * An internal message that used by Master itself, in order to handle the
+   * `DecommissionWorkersOnHosts` request from `MasterWebUI` asynchronously.
+   * @param ids A collection of Worker ids, which should be decommissioned.
+   */
+  case class DecommissionWorkers(ids: Seq[String]) extends DeployMessage
+
+  /**
+   * A message that sent from Master to Worker to decommission the Worker.
+   * It's used for the case where decommission is triggered at MasterWebUI.
+   *
+   * Note that decommission a Worker will cause all the executors on that 
Worker
+   * to be decommissioned as well.
+   */
+  object DecommissionWorker extends DeployMessage
+
+  /**
+   * A message that sent to the Worker itself when it receives PWR signal,

Review comment:
       Do you mean "sent by the worker to itself"?

##########
File path: core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
##########
@@ -61,13 +61,34 @@ private[deploy] object DeployMessages {
   }
 
   /**
+   * An internal message that used by Master itself, in order to handle the
+   * `DecommissionWorkersOnHosts` request from `MasterWebUI` asynchronously.
+   * @param ids A collection of Worker ids, which should be decommissioned.
+   */
+  case class DecommissionWorkers(ids: Seq[String]) extends DeployMessage
+
+  /**
+   * A message that sent from Master to Worker to decommission the Worker.
+   * It's used for the case where decommission is triggered at MasterWebUI.
+   *
+   * Note that decommission a Worker will cause all the executors on that 
Worker
+   * to be decommissioned as well.
+   */
+  object DecommissionWorker extends DeployMessage
+
+  /**
+   * A message that sent to the Worker itself when it receives PWR signal,
+   * indicating the Worker starts to decommission.

Review comment:
       I'm not sure about this parts meaning

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -166,17 +171,6 @@ private[spark] class CoarseGrainedExecutorBackend(
       if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
       } else {
-        if (decommissioned) {
-          val msg = "Asked to launch a task while decommissioned."
-          logError(msg)
-          driver match {
-            case Some(endpoint) =>
-              logInfo("Sending DecommissionExecutor to driver.")
-              endpoint.send(DecommissionExecutor(executorId, 
ExecutorDecommissionInfo(msg)))
-            case _ =>
-              logError("No registered driver to send Decommission to.")
-          }
-        }

Review comment:
       I don't think we should just take this out. async sends could fail, 
re-sending the message if we receive a request which indicates the master 
hasn't received our notification indicates we should resend.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -272,10 +268,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
         removeWorker(workerId, host, message)
         context.reply(true)
 
-      case DecommissionExecutor(executorId, decommissionInfo) =>
-        logError(s"Received decommission executor message ${executorId}: 
${decommissionInfo}.")
-        context.reply(decommissionExecutor(executorId, decommissionInfo,
-          adjustTargetNumExecutors = false))
+      case ExecutorDecommissioning(executorId) =>
+        logWarning(s"Received executor $executorId decommissioned message")

Review comment:
       Here might be where you break the test suite last time, so double check 
it.

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -213,9 +207,17 @@ private[spark] class CoarseGrainedExecutorBackend(
       logInfo(s"Received tokens of ${tokenBytes.length} bytes")
       SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
 
-    case DecommissionSelf =>
-      logInfo("Received decommission self")
+    case DecommissionExecutor =>
       decommissionSelf()
+
+    case ExecutorSigPWRReceived =>
+      decommissionSelf()
+      if (driver.nonEmpty) {

Review comment:
       Doing this after decommissionSelf introduces a race condition, not too 
important one but given that it wasn't in the original flow I'd suggest 
swapping the order.

##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -70,7 +70,10 @@ private[deploy] class Worker(
   if (conf.get(config.DECOMMISSION_ENABLED)) {
     logInfo("Registering SIGPWR handler to trigger decommissioning.")
     SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
-      "disabling worker decommission feature.")(decommissionSelf)
+      "disabling worker decommission feature.") {
+       self.send(WorkerSigPWRReceived)

Review comment:
       I think this might mean we return from handling the signal right away 
rather than waiting for decommissionSelf to be finished. Is this an intentional 
change?
   
   Also this will no longer report a decommissioning failure by signal return 
value, so may block pod deletion or other cleanup task longer than needed.

##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -668,8 +672,13 @@ private[deploy] class Worker(
       finishedApps += id
       maybeCleanupApplication(id)
 
-    case WorkerDecommission(_, _) =>
+    case DecommissionWorker =>
+      decommissionSelf()
+
+    case WorkerSigPWRReceived =>
       decommissionSelf()
+      // Tell master we starts decommissioning so it stops trying to launch 
executor/driver on us

Review comment:
       May I suggest "Tell the master that we are starting decommissioning so 
it stops trying to launch executor/driver on us"

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1809,7 +1809,9 @@ private[spark] class BlockManager(
     blocksToRemove.size
   }
 
-  def decommissionBlockManager(): Unit = synchronized {
+  def decommissionBlockManager(): Unit = 
storageEndpoint.ask(DecommissionBlockManager)

Review comment:
       Why?

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -213,9 +207,17 @@ private[spark] class CoarseGrainedExecutorBackend(
       logInfo(s"Received tokens of ${tokenBytes.length} bytes")
       SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
 
-    case DecommissionSelf =>
-      logInfo("Received decommission self")
+    case DecommissionExecutor =>
       decommissionSelf()
+
+    case ExecutorSigPWRReceived =>
+      decommissionSelf()
+      if (driver.nonEmpty) {
+        // Tell driver we starts decommissioning so it stops trying to 
schedule us

Review comment:
       s/we starts/we are starting/
   ?

##########
File path: 
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -79,12 +79,17 @@ private[spark] class CoarseGrainedExecutorBackend(
    */
   private[executor] val taskResources = new mutable.HashMap[Long, Map[String, 
ResourceInformation]]
 
-  @volatile private var decommissioned = false
+  private var decommissioned = false
 
   override def onStart(): Unit = {
-    logInfo("Registering PWR handler.")
-    SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
-      "disabling decommission feature.")(decommissionSelf)
+    if (env.conf.get(DECOMMISSION_ENABLED)) {
+      logInfo("Registering PWR handler to trigger decommissioning.")
+      SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
+      "disabling executor decommission feature.") {
+        self.send(ExecutorSigPWRReceived)
+        true
+      }
+    }

Review comment:
       Similar comments as in Worker.scala




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to