agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r466154263



##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -81,6 +82,43 @@ private[spark] trait ExecutorAllocationClient {
     countFailures: Boolean,
     force: Boolean = false): Seq[String]
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   * Default implementation delegates to kill, scheduler must override
+   * if it supports graceful decommissioning.
+   *
+   * @param executors identifiers of executors & decom info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  def decommissionExecutors(
+    executors: Seq[(String, ExecutorDecommissionInfo)],

Review comment:
       Can this be named a bit better: how about executorsAndDecomInfo ? 

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -204,7 +205,12 @@ private[spark] class ExecutorAllocationManager(
         s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 
0!")
     }
     if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
-      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+      // If dynamic allocation shuffle tracking or worker decommissioning 
along with
+      // storage shuffle decommissioning is enabled we have *experimental* 
support for
+      // decommissioning without a shuffle service.
+      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+          (conf.get(WORKER_DECOMMISSION_ENABLED) &&
+            conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
         logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")

Review comment:
       Please change the warning message to also say something about storage 
migration. 

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor(
     nextTimeout.set(Long.MinValue)
   }
 
+  private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = {

Review comment:
       I am wondering if the name `markExecutorsDecommissioned` would reflect 
the intent of this function better ?

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -81,6 +82,43 @@ private[spark] trait ExecutorAllocationClient {
     countFailures: Boolean,
     force: Boolean = false): Seq[String]
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   * Default implementation delegates to kill, scheduler must override
+   * if it supports graceful decommissioning.
+   *
+   * @param executors identifiers of executors & decom info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  def decommissionExecutors(
+    executors: Seq[(String, ExecutorDecommissionInfo)],
+    adjustTargetNumExecutors: Boolean): Seq[String] = {
+    killExecutors(executors.map(_._1),
+      adjustTargetNumExecutors,
+      countFailures = false)
+  }
+
+
+  /**
+   * Request that the cluster manager decommission the specified executor.
+   * Default implementation delegates to decommissionExecutors, scheduler can 
override
+   * if it supports graceful decommissioning.
+   *
+   * @param executorId identifiers of executor to decommission
+   * @param decommissionInfo information about the decommission (reason, host 
loss)
+   * @return whether the request is acknowledged by the cluster manager.
+   */
+  def decommissionExecutor(executorId: String,

Review comment:
       If you do decide to go with this naming, then please consider marking 
this method as final. It should remain a helper method to decom a single 
executor. 
   
   Is it also possible for you to please make adjustTargetNumExecutors be an 
additional argument to this method ? Then you don't need to use 
decommissionExecutors in the CoarseGrainedSchedulerBackend to decommission a 
single executor.

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -193,7 +193,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
       case DecommissionExecutor(executorId, decommissionInfo) =>
         logError(s"Received decommission executor message ${executorId}: 
$decommissionInfo")
-        decommissionExecutor(executorId, decommissionInfo)
+        decommissionExecutors(Seq((executorId, decommissionInfo)),

Review comment:
       Per my comment above, this could be `decommissionExecutor(executorId, 
decommissionInfo, adjustTargetNumExecutors=false)`

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
     val removed = executors.remove(event.executorId)
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
-      if (!removed.pendingRemoval) {
+      if (!removed.pendingRemoval || !removed.pendingDecommissioning) {
         nextTimeout.set(Long.MinValue)
       }
     }
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
-    if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
-      return
-    }
     val exec = 
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
       UNKNOWN_RESOURCE_PROFILE_ID)
+

Review comment:
       I didn't quite understand the reason for the change in this hunk: It 
seems that we should only be changing the KILL message to DECOMMISSION message 
if I understand the commit message/PR description properly. 
   
   Are we also changing what gets decommissioned or not ?

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -314,9 +314,13 @@ private[storage] class BlockManagerDecommissioner(
     logInfo("Starting block migration thread")
     if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
       rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable)
+    } else {
+      stoppedRDD = true
     }
     if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
       
shuffleBlockMigrationRefreshExecutor.submit(shuffleBlockMigrationRefreshRunnable)
+    } else {

Review comment:
       same as above

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
##########
@@ -81,6 +82,43 @@ private[spark] trait ExecutorAllocationClient {
     countFailures: Boolean,
     force: Boolean = false): Seq[String]
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   * Default implementation delegates to kill, scheduler must override
+   * if it supports graceful decommissioning.
+   *
+   * @param executors identifiers of executors & decom info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  def decommissionExecutors(

Review comment:
       IMHO, I find this API a bit confusing: Typically when you have a 
singular/plural sort of an interface: The plural just delegates to the singular 
and the singular implements the default implementation. 
   
   From the name of the methods, I would have expected decommissionExecutors to 
call out to the decommissionExecutor, and the latter to delegate to do the 
kill. In other words the plural decommissionExecutors is just a convenience 
looping method. 
   
   I am wondering if you think the naming can be cleared up to clarify these 
two functions intents ? Otherwise the difference is a bit too subtle.

##########
File path: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
##########
@@ -565,8 +573,14 @@ private[spark] class ExecutorAllocationManager(
     } else {
       // We don't want to change our target number of executors, because we 
already did that
       // when the task backlog decreased.
-      client.killExecutors(executorIdsToBeRemoved.toSeq, 
adjustTargetNumExecutors = false,
-        countFailures = false, force = false)
+      if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+        val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
+          id => (id, ExecutorDecommissionInfo("spark scale down", false)))
+        client.decommissionExecutors(executorIdsWithoutHostLoss, 
adjustTargetNumExecutors = false)

Review comment:
       Question: The default implementation of the ExecutorAllocationClient is 
to call killExecutors. So what does it mean for the WORKER_DECOM_ENABLED = true 
and the default implementation being called (ie the decommissionExecutors is 
not overridden) ? Perhaps it means that decommission is enabled by a config but 
the cluster manager does not support decommission and is thus ignored ? 
   
   In such an event should we be at parity with the else codepath -- ie `force` 
should be set to false in ExecutorAllocationClient ?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -274,7 +275,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
       case DecommissionExecutor(executorId, decommissionInfo) =>
         logError(s"Received decommission executor message ${executorId}: 
${decommissionInfo}.")
-        decommissionExecutor(executorId, decommissionInfo)
+        decommissionExecutors(Seq((executorId, decommissionInfo)),

Review comment:
       ditto

##########
File path: 
core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
##########
@@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
     assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 
executor remaining)
   }
 
+  test("mock polling loop remove with decommissioning") {
+    val clock = new ManualClock(2020L)

Review comment:
       Curious what 2020 means here and why is it so ?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
 
       var newNextTimeout = Long.MaxValue
       timedOutExecs = executors.asScala
-        .filter { case (_, exec) => !exec.pendingRemoval && 
!exec.hasActiveShuffle }
+        .filter { case (_, exec) =>
+          !exec.pendingRemoval && !exec.hasActiveShuffle && 
!exec.pendingDecommissioning}

Review comment:
       I didn't follow what adding pendingDecommissioning means here. Perhaps a 
code comment would clarify it ?

##########
File path: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
##########
@@ -133,7 +135,12 @@ private[streaming] class ExecutorAllocationManager(
       logDebug(s"Removable executors (${removableExecIds.size}): 
${removableExecIds}")
       if (removableExecIds.nonEmpty) {
         val execIdToRemove = 
removableExecIds(Random.nextInt(removableExecIds.size))
-        client.killExecutor(execIdToRemove)
+        if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+          client.decommissionExecutor(execIdToRemove,
+            ExecutorDecommissionInfo("spark scale down", false))

Review comment:
       Just making sure that if the decommissionExecutor does indeed delegate 
down to killExecutors -- the flags passed by killExecutor (eventually to 
killExecutors) match those passed by decommissionExecutor ?
   
   Making sure that the code paths are identical if WORKER_DECOMMISSION_ENABLED 
= true but the cluster manager does not support decommissioning.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,7 +1822,7 @@ private[spark] class BlockManager(
     }
   }
 
-  /*

Review comment:
       Unintended change ?

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
##########
@@ -43,9 +43,12 @@ class BlockManagerMaster(
     logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
-  /** Decommission block managers corresponding to given set of executors */
+  /** Decommission block managers corresponding to given set of executors
+   * Non-blocking.
+   */
   def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
-    driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
+    driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds))
+    logInfo(s"Decommissioning block managers on ${executorIds}")

Review comment:
       Unintended changes in this file ? Seems to be for testing etc. 

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -276,6 +276,8 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     }
 
     // Wait for the executor to be removed automatically after migration.
+    // This is set to a high value since github actions is sometimes high 
latency

Review comment:
       This definitely is coming from the other PR

##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -314,9 +314,13 @@ private[storage] class BlockManagerDecommissioner(
     logInfo("Starting block migration thread")
     if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
       rddBlockMigrationExecutor.submit(rddBlockMigrationRunnable)
+    } else {

Review comment:
       Is this change intended ? It smells coming from the shuffle-migration or 
executor-exit-on-decom PR. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to