agrawaldevesh commented on a change in pull request #29367:
URL: https://github.com/apache/spark/pull/29367#discussion_r467485361



##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
     val removed = executors.remove(event.executorId)
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
-      if (!removed.pendingRemoval) {
+      if (!removed.pendingRemoval || !removed.decommissioning) {

Review comment:
       I am just trying to follow along this code, so pardon me if this is a 
n00b question: Why are we separately tracking pendingRemoval and 
decommissioning separately ? Two questions about that:
   
   - If an executor is marked as decommissioned here, when is it actually 
removed ? (Outside of dynamic allocation that happens when the executor 
naturally has a heartbeat failure. ). 
   - Is my understanding correct that if graceful decommissioning is plugged 
into dynamic-allocation (this feature) AND the cluster manager supports 
decommissioning, then pendingRemoval would be empty -- ie executors would only 
be decommission ?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor(
       //
       // This means that an executor may be marked as having shuffle data, and 
thus prevented
       // from being removed, even though the data may not be used.
+      // TODO: Only track used files (SPARK-31974)

Review comment:
       Is this comment change intended ?

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -326,18 +352,33 @@ private[spark] class ExecutorMonitor(
     val removed = executors.remove(event.executorId)
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
-      if (!removed.pendingRemoval) {
+      if (!removed.pendingRemoval || !removed.decommissioning) {
         nextTimeout.set(Long.MinValue)
       }
     }
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
-    if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
-      return
-    }
     val exec = 
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
       UNKNOWN_RESOURCE_PROFILE_ID)
+
+    // Check if it is a shuffle file, or RDD to pick the correct codepath for 
update
+    if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && 
shuffleTrackingEnabled) {
+      /**
+       * The executor monitor keeps track of locations of cache and shuffle 
blocks and this can be
+       * used to decide which executor(s) Spark should shutdown first. Since 
we move shuffle blocks
+       * around now this wires it up so that it keeps track of it. We only do 
this for data blocks
+       * as index and other blocks blocks do not necessarily mean the entire 
block has been
+       * committed.
+       */
+      event.blockUpdatedInfo.blockId match {
+        case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
+        case _ => // For now we only update on data blocks
+      }
+      return
+    } else if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {

Review comment:
       just a code nit to avoid two returns's:
   
   Would it make sense to put this shuffle block check inside the if-branch of 
not-instanceof-RDDblockId ? like:
   
   ```
       if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
         if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] && 
shuffleTrackingEnabled) {
          ....
         }
        return
       }
   ```

##########
File path: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -503,6 +504,102 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Seq[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {

Review comment:
       Is the comment above accurate ? It seems we are indeed replacing the 
executors that are decommissioned when adjustTargetNumExecutors = true. 
   
   On a related note, should `adjustTargetNumExecutors` be simply renamed as 
`replaceDecommissionedExecutors` ? to make the meaning be more direct ?

##########
File path: 
streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
##########
@@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager(
       logDebug(s"Removable executors (${removableExecIds.size}): 
${removableExecIds}")
       if (removableExecIds.nonEmpty) {
         val execIdToRemove = 
removableExecIds(Random.nextInt(removableExecIds.size))
-        client.killExecutor(execIdToRemove)
+        if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+          client.decommissionExecutor(execIdToRemove,
+            ExecutorDecommissionInfo("spark scale down", false),
+            adjustTargetNumExecutors = true)

Review comment:
       I feel that this is the ONLY place where adjustTargetNumExecutors should 
be set to true. 

##########
File path: 
streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
##########
@@ -83,12 +96,26 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
             Map.empty)}
       }
 
-      /** Verify that a particular executor was killed */
+      /** Verify that a particular executor was scaled down. */
       def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {

Review comment:
       Do you want to rename the method to verifyScaledDownExec ? to make it 
match the comment change ?

##########
File path: 
core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
##########
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
     // decom.sh message passing is tested manually.
     val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
     val execs = sched.getExecutorIds()
-    execs.foreach(execId => sched.decommissionExecutor(execId, 
ExecutorDecommissionInfo("", false)))
+    // Make the executors decommission, finish, exit, and not be replaced.
+    val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", false)))
+    sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = 
true)

Review comment:
       I am confused here: `adjustTargetNumExecutors = true` means that the 
executor should be replaced (IIUC). Whereas the comment above says "not be 
replaced".

##########
File path: 
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -190,7 +190,8 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     logInfo(s"Decommissioning executor ${execToDecommission}")
     sched.decommissionExecutor(
       execToDecommission,
-      ExecutorDecommissionInfo("", isHostDecommissioned = false))
+      ExecutorDecommissionInfo("", isHostDecommissioned = false),
+      adjustTargetNumExecutors = true)

Review comment:
       Why is this true and not false ? We explicitly want to kill and discard 
the executor here without replacing it. Although the test does not truly care, 
but still why the change ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to