This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 548ac7c  [SPARK-31198][CORE] Use graceful decommissioning as part of 
dynamic scaling
548ac7c is described below

commit 548ac7c4af2270a6bdbf7a6f29f4846eecdc0171
Author: Holden Karau <hka...@apple.com>
AuthorDate: Wed Aug 12 17:07:18 2020 -0700

    [SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling
    
    ### What changes were proposed in this pull request?
    
    If graceful decommissioning is enabled, Spark's dynamic scaling uses this 
instead of directly killing executors.
    
    ### Why are the changes needed?
    
    When scaling down Spark we should avoid triggering recomputes as much as 
possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Hopefully their jobs run faster or at the same speed. It also enables 
experimental shuffle service free dynamic scaling when graceful decommissioning 
is enabled (using the same code as the shuffle tracking dynamic scaling).
    
    ### How was this patch tested?
    
    For now I've extended the ExecutorAllocationManagerSuite for both core & 
streaming.
    
    Closes #29367 from 
holdenk/SPARK-31198-use-graceful-decommissioning-as-part-of-dynamic-scaling.
    
    Lead-authored-by: Holden Karau <hka...@apple.com>
    Co-authored-by: Holden Karau <hol...@pigscanfly.ca>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../apache/spark/ExecutorAllocationClient.scala    |  40 +++++
 .../apache/spark/ExecutorAllocationManager.scala   |  30 +++-
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 190 ++++++++++++---------
 .../cluster/StandaloneSchedulerBackend.scala       |   3 +-
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala |  61 ++++++-
 .../org/apache/spark/storage/BlockManager.scala    |   2 +-
 .../apache/spark/storage/BlockManagerMaster.scala  |   6 +-
 .../spark/ExecutorAllocationManagerSuite.scala     |  71 +++++++-
 .../WorkerDecommissionExtendedSuite.scala          |   3 +-
 .../spark/scheduler/WorkerDecommissionSuite.scala  |   4 +-
 .../BlockManagerDecommissionIntegrationSuite.scala |   7 +-
 project/SparkBuild.scala                           |   2 +
 .../docker/src/main/dockerfiles/spark/decom.sh     |   2 +-
 .../k8s/integrationtest/KubernetesSuite.scala      |  27 ++-
 .../integration-tests/tests/decommissioning.py     |   5 -
 .../scheduler/ExecutorAllocationManager.scala      |  10 +-
 .../scheduler/ExecutorAllocationManagerSuite.scala |  51 ++++--
 17 files changed, 380 insertions(+), 134 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 00bd006..079340a 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
+
 /**
  * A client that communicates with the cluster manager to request or kill 
executors.
  * This is currently supported only in YARN mode.
@@ -82,6 +84,44 @@ private[spark] trait ExecutorAllocationClient {
     force: Boolean = false): Seq[String]
 
   /**
+   * Request that the cluster manager decommission the specified executors.
+   * Default implementation delegates to kill, scheduler must override
+   * if it supports graceful decommissioning.
+   *
+   * @param executorsAndDecominfo identifiers of executors & decom info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  def decommissionExecutors(
+    executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+    adjustTargetNumExecutors: Boolean): Seq[String] = {
+    killExecutors(executorsAndDecomInfo.map(_._1),
+      adjustTargetNumExecutors,
+      countFailures = false)
+  }
+
+
+  /**
+   * Request that the cluster manager decommission the specified executor.
+   * Delegates to decommissionExecutors.
+   *
+   * @param executorId identifiers of executor to decommission
+   * @param decommissionInfo information about the decommission (reason, host 
loss)
+   * @param adjustTargetNumExecutors if we should adjust the target number of 
executors.
+   * @return whether the request is acknowledged by the cluster manager.
+   */
+  final def decommissionExecutor(executorId: String,
+      decommissionInfo: ExecutorDecommissionInfo,
+      adjustTargetNumExecutors: Boolean): Boolean = {
+    val decommissionedExecutors = decommissionExecutors(
+      Array((executorId, decommissionInfo)),
+      adjustTargetNumExecutors = adjustTargetNumExecutors)
+    decommissionedExecutors.nonEmpty && 
decommissionedExecutors(0).equals(executorId)
+  }
+
+
+  /**
    * Request that the cluster manager kill every executor on the specified 
host.
    *
    * @return whether the request is acknowledged by the cluster manager.
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 1570f86..1cb840f 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -28,6 +28,7 @@ import com.codahale.metrics.{Gauge, MetricRegistry}
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
 import org.apache.spark.resource.ResourceProfileManager
@@ -127,6 +128,8 @@ private[spark] class ExecutorAllocationManager(
   private val executorAllocationRatio =
     conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
 
+  private val decommissionEnabled = conf.get(WORKER_DECOMMISSION_ENABLED)
+
   private val defaultProfileId = 
resourceProfileManager.defaultResourceProfile.id
 
   validateSettings()
@@ -204,7 +207,12 @@ private[spark] class ExecutorAllocationManager(
         s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 
0!")
     }
     if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
-      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED)) {
+      // If dynamic allocation shuffle tracking or worker decommissioning 
along with
+      // storage shuffle decommissioning is enabled we have *experimental* 
support for
+      // decommissioning without a shuffle service.
+      if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED) ||
+          (decommissionEnabled &&
+            conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))) {
         logWarning("Dynamic allocation without a shuffle service is an 
experimental feature.")
       } else if (!testing) {
         throw new SparkException("Dynamic allocation of executors requires the 
external " +
@@ -539,7 +547,9 @@ private[spark] class ExecutorAllocationManager(
         // get the running total as we remove or initialize it to the count - 
pendingRemoval
         val newExecutorTotal = numExecutorsTotalPerRpId.getOrElseUpdate(rpId,
           (executorMonitor.executorCountWithResourceProfile(rpId) -
-            executorMonitor.pendingRemovalCountPerResourceProfileId(rpId)))
+            executorMonitor.pendingRemovalCountPerResourceProfileId(rpId) -
+            executorMonitor.decommissioningPerResourceProfileId(rpId)
+          ))
         if (newExecutorTotal - 1 < minNumExecutors) {
           logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there " +
             s"are only $newExecutorTotal executor(s) left (minimum number of 
executor limit " +
@@ -565,8 +575,14 @@ private[spark] class ExecutorAllocationManager(
     } else {
       // We don't want to change our target number of executors, because we 
already did that
       // when the task backlog decreased.
-      client.killExecutors(executorIdsToBeRemoved.toSeq, 
adjustTargetNumExecutors = false,
-        countFailures = false, force = false)
+      if (decommissionEnabled) {
+        val executorIdsWithoutHostLoss = executorIdsToBeRemoved.toSeq.map(
+          id => (id, ExecutorDecommissionInfo("spark scale down", 
false))).toArray
+        client.decommissionExecutors(executorIdsWithoutHostLoss, 
adjustTargetNumExecutors = false)
+      } else {
+        client.killExecutors(executorIdsToBeRemoved.toSeq, 
adjustTargetNumExecutors = false,
+          countFailures = false, force = false)
+      }
     }
 
     // [SPARK-21834] killExecutors api reduces the target number of executors.
@@ -578,7 +594,11 @@ private[spark] class ExecutorAllocationManager(
 
     // reset the newExecutorTotal to the existing number of executors
     if (testing || executorsRemoved.nonEmpty) {
-      executorMonitor.executorsKilled(executorsRemoved.toSeq)
+      if (decommissionEnabled) {
+        executorMonitor.executorsDecommissioned(executorsRemoved)
+      } else {
+        executorMonitor.executorsKilled(executorsRemoved.toSeq)
+      }
       logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to 
idle timeout.")
       executorsRemoved.toSeq
     } else {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 200f2d8..ca65731 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -193,7 +193,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
       case DecommissionExecutor(executorId, decommissionInfo) =>
         logError(s"Received decommission executor message ${executorId}: 
$decommissionInfo")
-        decommissionExecutor(executorId, decommissionInfo)
+        decommissionExecutor(executorId, decommissionInfo, 
adjustTargetNumExecutors = false)
 
       case RemoveWorker(workerId, host, message) =>
         removeWorker(workerId, host, message)
@@ -274,8 +274,8 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
       case DecommissionExecutor(executorId, decommissionInfo) =>
         logError(s"Received decommission executor message ${executorId}: 
${decommissionInfo}.")
-        decommissionExecutor(executorId, decommissionInfo)
-        context.reply(true)
+        context.reply(decommissionExecutor(executorId, decommissionInfo,
+          adjustTargetNumExecutors = false))
 
       case RetrieveSparkAppConfig(resourceProfileId) =>
         val rp = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(resourceProfileId)
@@ -420,59 +420,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     }
 
     /**
-     * Mark a given executor as decommissioned and stop making resource offers 
for it.
-     */
-    private def decommissionExecutor(
-        executorId: String, decommissionInfo: ExecutorDecommissionInfo): 
Boolean = {
-      val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
-        // Only bother decommissioning executors which are alive.
-        if (isExecutorActive(executorId)) {
-          executorsPendingDecommission += executorId
-          true
-        } else {
-          false
-        }
-      }
-
-      if (shouldDisable) {
-        logInfo(s"Starting decommissioning executor $executorId.")
-        try {
-          scheduler.executorDecommission(executorId, decommissionInfo)
-        } catch {
-          case e: Exception =>
-            logError(s"Unexpected error during decommissioning ${e.toString}", 
e)
-        }
-        // Send decommission message to the executor, this may be a duplicate 
since the executor
-        // could have been the one to notify us. But it's also possible the 
notification came from
-        // elsewhere and the executor does not yet know.
-        executorDataMap.get(executorId) match {
-          case Some(executorInfo) =>
-            executorInfo.executorEndpoint.send(DecommissionSelf)
-          case None =>
-            // Ignoring the executor since it is not registered.
-            logWarning(s"Attempted to decommission unknown executor 
$executorId.")
-        }
-        logInfo(s"Finished decommissioning executor $executorId.")
-
-        if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
-          try {
-            logInfo("Starting decommissioning block manager corresponding to " 
+
-              s"executor $executorId.")
-            
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
-          } catch {
-            case e: Exception =>
-              logError("Unexpected error during block manager " +
-                s"decommissioning for executor $executorId: ${e.toString}", e)
-          }
-          logInfo(s"Acknowledged decommissioning block manager corresponding 
to $executorId.")
-        }
-      } else {
-        logInfo(s"Skipping decommissioning of executor $executorId.")
-      }
-      shouldDisable
-    }
-
-    /**
      * Stop making resource offers for the given executor. The executor is 
marked as lost with
      * the loss reason still pending.
      *
@@ -503,6 +450,87 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def minRegisteredRatio: Double = _minRegisteredRatio
 
+  /**
+   * Request that the cluster manager decommission the specified executors.
+   *
+   * @param executorsAndDecomInfo Identifiers of executors & decommission info.
+   * @param adjustTargetNumExecutors whether the target number of executors 
will be adjusted down
+   *                                 after these executors have been 
decommissioned.
+   * @return the ids of the executors acknowledged by the cluster manager to 
be removed.
+   */
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean): Seq[String] = {
+
+    val executorsToDecommission = executorsAndDecomInfo.filter { case 
(executorId, _) =>
+      CoarseGrainedSchedulerBackend.this.synchronized {
+        // Only bother decommissioning executors which are alive.
+        if (isExecutorActive(executorId)) {
+          executorsPendingDecommission += executorId
+          true
+        } else {
+          false
+        }
+      }
+    }
+
+    // If we don't want to replace the executors we are decommissioning
+    if (adjustTargetNumExecutors) {
+      adjustExecutors(executorsToDecommission.map(_._1))
+    }
+
+    executorsToDecommission.filter { case (executorId, decomInfo) =>
+      doDecommission(executorId, decomInfo)
+    }.map(_._1)
+  }
+
+
+  private def doDecommission(executorId: String,
+      decomInfo: ExecutorDecommissionInfo): Boolean = {
+
+    logInfo(s"Asking executor $executorId to decommissioning.")
+    try {
+      scheduler.executorDecommission(executorId, decomInfo)
+      if (driverEndpoint != null) {
+        logInfo("Propagating executor decommission to driver.")
+        driverEndpoint.send(DecommissionExecutor(executorId, decomInfo))
+      }
+    } catch {
+      case e: Exception =>
+        logError(s"Unexpected error during decommissioning ${e.toString}", e)
+        return false
+    }
+    // Send decommission message to the executor (it could have originated on 
the executor
+    // but not necessarily.
+    CoarseGrainedSchedulerBackend.this.synchronized {
+      executorDataMap.get(executorId) match {
+        case Some(executorInfo) =>
+          executorInfo.executorEndpoint.send(DecommissionSelf)
+        case None =>
+          // Ignoring the executor since it is not registered.
+          logWarning(s"Attempted to decommission unknown executor 
$executorId.")
+          return false
+      }
+    }
+    logInfo(s"Asked executor $executorId to decommission.")
+
+    if (conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+      try {
+        logInfo(s"Asking block manager corresponding to executor $executorId 
to decommission.")
+        
scheduler.sc.env.blockManager.master.decommissionBlockManagers(Seq(executorId))
+      } catch {
+        case e: Exception =>
+          logError("Unexpected error during block manager " +
+            s"decommissioning for executor $executorId: ${e.toString}", e)
+          return false
+      }
+      logInfo(s"Acknowledged decommissioning block manager corresponding to 
$executorId.")
+    }
+
+    true
+  }
+
+
   override def start(): Unit = {
     if (UserGroupInformation.isSecurityEnabled()) {
       delegationTokenManager = createTokenManager()
@@ -598,17 +626,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     driverEndpoint.send(RemoveWorker(workerId, host, message))
   }
 
-  /**
-   * Called by subclasses when notified of a decommissioning executor.
-   */
-  private[spark] def decommissionExecutor(
-      executorId: String, decommissionInfo: ExecutorDecommissionInfo): Unit = {
-    if (driverEndpoint != null) {
-      logInfo("Propagating executor decommission to driver.")
-      driverEndpoint.send(DecommissionExecutor(executorId, decommissionInfo))
-    }
-  }
-
   def sufficientResourcesRegistered(): Boolean = true
 
   override def isReady(): Boolean = {
@@ -761,6 +778,31 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     Future.successful(false)
 
   /**
+   * Adjust the number of executors being requested to no longer include the 
provided executors.
+   */
+  private def adjustExecutors(executorIds: Seq[String]) = {
+    if (executorIds.nonEmpty) {
+      executorIds.foreach { exec =>
+        withLock {
+          val rpId = executorDataMap(exec).resourceProfileId
+          val rp = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
+          if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
+            // Assume that we are killing an executor that was started by 
default and
+            // not through the request api
+            requestedTotalExecutorsPerResourceProfile(rp) = 0
+          } else {
+            val requestedTotalForRp = 
requestedTotalExecutorsPerResourceProfile(rp)
+            requestedTotalExecutorsPerResourceProfile(rp) = 
math.max(requestedTotalForRp - 1, 0)
+          }
+        }
+      }
+      doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+    } else {
+      Future.successful(true)
+    }
+  }
+
+  /**
    * Request that the cluster manager kill the specified executors.
    *
    * @param executorIds identifiers of executors to kill
@@ -798,19 +840,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       // take into account executors that are pending to be added or removed.
       val adjustTotalExecutors =
         if (adjustTargetNumExecutors) {
-          executorsToKill.foreach { exec =>
-            val rpId = executorDataMap(exec).resourceProfileId
-            val rp = 
scheduler.sc.resourceProfileManager.resourceProfileFromId(rpId)
-            if (requestedTotalExecutorsPerResourceProfile.isEmpty) {
-              // Assume that we are killing an executor that was started by 
default and
-              // not through the request api
-              requestedTotalExecutorsPerResourceProfile(rp) = 0
-            } else {
-              val requestedTotalForRp = 
requestedTotalExecutorsPerResourceProfile(rp)
-              requestedTotalExecutorsPerResourceProfile(rp) = 
math.max(requestedTotalForRp - 1, 0)
-            }
-          }
-          
doRequestTotalExecutors(requestedTotalExecutorsPerResourceProfile.toMap)
+          adjustExecutors(executorsToKill)
         } else {
           Future.successful(true)
         }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index d921af6..3acb6f1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -176,7 +176,8 @@ private[spark] class StandaloneSchedulerBackend(
 
   override def executorDecommissioned(fullId: String, decommissionInfo: 
ExecutorDecommissionInfo) {
     logInfo("Asked to decommission executor")
-    decommissionExecutor(fullId.split("/")(1), decommissionInfo)
+    val execId = fullId.split("/")(1)
+    decommissionExecutors(Array((execId, decommissionInfo)), 
adjustTargetNumExecutors = false)
     logInfo("Executor %s decommissioned: %s".format(fullId, decommissionInfo))
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
index 4d71907..8dbdc84 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config._
 import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
 import org.apache.spark.scheduler._
-import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.storage.{RDDBlockId, ShuffleDataBlockId}
 import org.apache.spark.util.Clock
 
 /**
@@ -114,7 +114,8 @@ private[spark] class ExecutorMonitor(
 
       var newNextTimeout = Long.MaxValue
       timedOutExecs = executors.asScala
-        .filter { case (_, exec) => !exec.pendingRemoval && 
!exec.hasActiveShuffle }
+        .filter { case (_, exec) =>
+          !exec.pendingRemoval && !exec.hasActiveShuffle && 
!exec.decommissioning}
         .filter { case (_, exec) =>
           val deadline = exec.timeoutAt
           if (deadline > now) {
@@ -135,6 +136,7 @@ private[spark] class ExecutorMonitor(
 
   /**
    * Mark the given executors as pending to be removed. Should only be called 
in the EAM thread.
+   * This covers both kills and decommissions.
    */
   def executorsKilled(ids: Seq[String]): Unit = {
     ids.foreach { id =>
@@ -149,6 +151,19 @@ private[spark] class ExecutorMonitor(
     nextTimeout.set(Long.MinValue)
   }
 
+  private[spark] def executorsDecommissioned(ids: Seq[String]): Unit = {
+    ids.foreach { id =>
+      val tracker = executors.get(id)
+      if (tracker != null) {
+        tracker.decommissioning = true
+      }
+    }
+
+    // Recompute timed out executors in the next EAM callback, since this call 
invalidates
+    // the current list.
+    nextTimeout.set(Long.MinValue)
+  }
+
   def executorCount: Int = executors.size()
 
   def executorCountWithResourceProfile(id: Int): Int = {
@@ -171,6 +186,16 @@ private[spark] class ExecutorMonitor(
     executors.asScala.filter { case (k, v) => v.resourceProfileId == id && 
v.pendingRemoval }.size
   }
 
+  def decommissioningCount: Int = executors.asScala.count { case (_, exec) =>
+    exec.decommissioning
+  }
+
+  def decommissioningPerResourceProfileId(id: Int): Int = {
+    executors.asScala.filter { case (k, v) =>
+      v.resourceProfileId == id && v.decommissioning
+    }.size
+  }
+
   override def onJobStart(event: SparkListenerJobStart): Unit = {
     if (!shuffleTrackingEnabled) {
       return
@@ -298,6 +323,7 @@ private[spark] class ExecutorMonitor(
       //
       // This means that an executor may be marked as having shuffle data, and 
thus prevented
       // from being removed, even though the data may not be used.
+      // TODO: Only track used files (SPARK-31974)
       if (shuffleTrackingEnabled && event.reason == Success) {
         stageToShuffleID.get(event.stageId).foreach { shuffleId =>
           exec.addShuffle(shuffleId)
@@ -326,18 +352,35 @@ private[spark] class ExecutorMonitor(
     val removed = executors.remove(event.executorId)
     if (removed != null) {
       decrementExecResourceProfileCount(removed.resourceProfileId)
-      if (!removed.pendingRemoval) {
+      if (!removed.pendingRemoval || !removed.decommissioning) {
         nextTimeout.set(Long.MinValue)
       }
     }
   }
 
   override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
+    val exec = 
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
+      UNKNOWN_RESOURCE_PROFILE_ID)
+
+    // Check if it is a shuffle file, or RDD to pick the correct codepath for 
update
     if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
+      if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] &&
+        shuffleTrackingEnabled) {
+        /**
+         * The executor monitor keeps track of locations of cache and shuffle 
blocks and this can
+         * be used to decide which executor(s) Spark should shutdown first. 
Since we move shuffle
+         * blocks around now this wires it up so that it keeps track of it. We 
only do this for
+         * data blocks as index and other blocks blocks do not necessarily 
mean the entire block
+         * has been committed.
+         */
+        event.blockUpdatedInfo.blockId match {
+          case ShuffleDataBlockId(shuffleId, _, _) => 
exec.addShuffle(shuffleId)
+          case _ => // For now we only update on data blocks
+        }
+      }
       return
     }
-    val exec = 
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
-      UNKNOWN_RESOURCE_PROFILE_ID)
+
     val storageLevel = event.blockUpdatedInfo.storageLevel
     val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
 
@@ -410,10 +453,15 @@ private[spark] class ExecutorMonitor(
   }
 
   // Visible for testing
-  def executorsPendingToRemove(): Set[String] = {
+  private[spark] def executorsPendingToRemove(): Set[String] = {
     executors.asScala.filter { case (_, exec) => exec.pendingRemoval 
}.keys.toSet
   }
 
+  // Visible for testing
+  private[spark] def executorsDecommissioning(): Set[String] = {
+    executors.asScala.filter { case (_, exec) => exec.decommissioning 
}.keys.toSet
+  }
+
   /**
    * This method should be used when updating executor state. It guards 
against a race condition in
    * which the `SparkListenerTaskStart` event is posted before the 
`SparkListenerBlockManagerAdded`
@@ -466,6 +514,7 @@ private[spark] class ExecutorMonitor(
     @volatile var timedOut: Boolean = false
 
     var pendingRemoval: Boolean = false
+    var decommissioning: Boolean = false
     var hasActiveShuffle: Boolean = false
 
     private var idleStart: Long = -1
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 6ec93df..ee534f5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1822,7 +1822,7 @@ private[spark] class BlockManager(
     }
   }
 
-  /*
+  /**
    *  Returns the last migration time and a boolean denoting if all the blocks 
have been migrated.
    *  If there are any tasks running since that time the boolean may be 
incorrect.
    */
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 93492cc..f544d47 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -43,9 +43,11 @@ class BlockManagerMaster(
     logInfo("Removed " + execId + " successfully in removeExecutor")
   }
 
-  /** Decommission block managers corresponding to given set of executors */
+  /** Decommission block managers corresponding to given set of executors
+   * Non-blocking.
+   */
   def decommissionBlockManagers(executorIds: Seq[String]): Unit = {
-    driverEndpoint.ask[Unit](DecommissionBlockManagers(executorIds))
+    driverEndpoint.ask[Boolean](DecommissionBlockManagers(executorIds))
   }
 
   /** Get Replication Info for all the RDD blocks stored in given 
blockManagerId */
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 5b367d2..3abe051 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -28,6 +28,7 @@ import org.scalatest.PrivateMethodTester
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.resource._
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
@@ -1270,6 +1271,68 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
     assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 
executor remaining)
   }
 
+  test("mock polling loop remove with decommissioning") {
+    val clock = new ManualClock(2020L)
+    val manager = createManager(createConf(1, 20, 1, true), clock = clock)
+
+    // Remove idle executors on timeout
+    onExecutorAddedDefaultProfile(manager, "executor-1")
+    onExecutorAddedDefaultProfile(manager, "executor-2")
+    onExecutorAddedDefaultProfile(manager, "executor-3")
+    assert(executorsDecommissioning(manager).isEmpty)
+    assert(executorsPendingToRemove(manager).isEmpty)
+
+    // idle threshold not reached yet
+    clock.advance(executorIdleTimeout * 1000 / 2)
+    schedule(manager)
+    assert(manager.executorMonitor.timedOutExecutors().isEmpty)
+    assert(executorsPendingToRemove(manager).isEmpty)
+    assert(executorsDecommissioning(manager).isEmpty)
+
+    // idle threshold exceeded
+    clock.advance(executorIdleTimeout * 1000)
+    assert(manager.executorMonitor.timedOutExecutors().size === 3)
+    schedule(manager)
+    assert(executorsPendingToRemove(manager).isEmpty) // limit reached (1 
executor remaining)
+    assert(executorsDecommissioning(manager).size === 2) // limit reached (1 
executor remaining)
+
+    // Mark a subset as busy - only idle executors should be removed
+    onExecutorAddedDefaultProfile(manager, "executor-4")
+    onExecutorAddedDefaultProfile(manager, "executor-5")
+    onExecutorAddedDefaultProfile(manager, "executor-6")
+    onExecutorAddedDefaultProfile(manager, "executor-7")
+    assert(manager.executorMonitor.executorCount === 7)
+    assert(executorsPendingToRemove(manager).isEmpty) // no pending to be 
removed
+    assert(executorsDecommissioning(manager).size === 2) // 2 decommissioning
+    onExecutorBusy(manager, "executor-4")
+    onExecutorBusy(manager, "executor-5")
+    onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 
active ones)
+
+    // after scheduling, the previously timed out executor should be removed, 
since
+    // there are new active ones.
+    schedule(manager)
+    assert(executorsDecommissioning(manager).size === 3)
+
+    // advance the clock so that idle executors should time out and move to 
the pending list
+    clock.advance(executorIdleTimeout * 1000)
+    schedule(manager)
+    assert(executorsPendingToRemove(manager).size === 0)
+    assert(executorsDecommissioning(manager).size === 4)
+    assert(!executorsDecommissioning(manager).contains("executor-4"))
+    assert(!executorsDecommissioning(manager).contains("executor-5"))
+    assert(!executorsDecommissioning(manager).contains("executor-6"))
+
+    // Busy executors are now idle and should be removed
+    onExecutorIdle(manager, "executor-4")
+    onExecutorIdle(manager, "executor-5")
+    onExecutorIdle(manager, "executor-6")
+    schedule(manager)
+    assert(executorsDecommissioning(manager).size === 4)
+    clock.advance(executorIdleTimeout * 1000)
+    schedule(manager)
+    assert(executorsDecommissioning(manager).size === 6) // limit reached (1 
executor remaining)
+  }
+
   test("listeners trigger add executors correctly") {
     val manager = createManager(createConf(1, 20, 1))
     assert(addTime(manager) === NOT_SET)
@@ -1588,7 +1651,8 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
   private def createConf(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
-      initialExecutors: Int = 1): SparkConf = {
+      initialExecutors: Int = 1,
+      decommissioningEnabled: Boolean = false): SparkConf = {
     val sparkConf = new SparkConf()
       .set(config.DYN_ALLOCATION_ENABLED, true)
       .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
@@ -1604,6 +1668,7 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
       // SPARK-22864: effectively disable the allocation schedule by setting 
the period to a
       // really long value.
       .set(TEST_SCHEDULE_INTERVAL, 30000L)
+      .set(WORKER_DECOMMISSION_ENABLED, decommissioningEnabled)
     sparkConf
   }
 
@@ -1670,6 +1735,10 @@ class ExecutorAllocationManagerSuite extends 
SparkFunSuite {
   private def executorsPendingToRemove(manager: ExecutorAllocationManager): 
Set[String] = {
     manager.executorMonitor.executorsPendingToRemove()
   }
+
+  private def executorsDecommissioning(manager: ExecutorAllocationManager): 
Set[String] = {
+    manager.executorMonitor.executorsDecommissioning()
+  }
 }
 
 /**
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
index d95deb1..6bfd3f7 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
@@ -65,7 +65,8 @@ class WorkerDecommissionExtendedSuite extends SparkFunSuite 
with LocalSparkConte
 
       val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
       sc.getExecutorIds().tail.foreach { id =>
-        sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false))
+        sched.decommissionExecutor(id, ExecutorDecommissionInfo("", false),
+          adjustTargetNumExecutors = false)
         assert(rdd3.sortByKey().collect().length === 100)
       }
     }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
index bb0c33a..ea5be21 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionSuite.scala
@@ -76,7 +76,9 @@ class WorkerDecommissionSuite extends SparkFunSuite with 
LocalSparkContext {
     // decom.sh message passing is tested manually.
     val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
     val execs = sched.getExecutorIds()
-    execs.foreach(execId => sched.decommissionExecutor(execId, 
ExecutorDecommissionInfo("", false)))
+    // Make the executors decommission, finish, exit, and not be replaced.
+    val execsAndDecomInfo = execs.map((_, ExecutorDecommissionInfo("", 
false))).toArray
+    sched.decommissionExecutors(execsAndDecomInfo, adjustTargetNumExecutors = 
true)
     val asyncCountResult = ThreadUtils.awaitResult(asyncCount, 20.seconds)
     assert(asyncCountResult === 10)
   }
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 7cf0083..82f87a5 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -188,9 +188,12 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
 
     val execToDecommission = getCandidateExecutorToDecom.get
     logInfo(s"Decommissioning executor ${execToDecommission}")
+
+    // Decommission executor and ensure it is not relaunched by setting 
adjustTargetNumExecutors
     sched.decommissionExecutor(
       execToDecommission,
-      ExecutorDecommissionInfo("", isHostDecommissioned = false))
+      ExecutorDecommissionInfo("", isHostDecommissioned = false),
+      adjustTargetNumExecutors = true)
     val decomTime = new SystemClock().getTimeMillis()
 
     // Wait for job to finish.
@@ -276,6 +279,8 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
     }
 
     // Wait for the executor to be removed automatically after migration.
+    // This is set to a high value since github actions is sometimes high 
latency
+    // but I've never seen this go for more than a minute.
     assert(executorRemovedSem.tryAcquire(1, 5L, TimeUnit.MINUTES))
 
     // Since the RDD is cached or shuffled so further usage of same RDD should 
use the
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 5a3ac21..110c311 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -838,6 +838,8 @@ object Unidoc {
         f.getCanonicalPath.contains("org/apache/spark/shuffle") &&
         !f.getCanonicalPath.contains("org/apache/spark/shuffle/api")))
       
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/executor")))
+      
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/ExecutorAllocationClient")))
+      
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend")))
       .map(_.filterNot(f =>
         f.getCanonicalPath.contains("org/apache/spark/unsafe") &&
         
!f.getCanonicalPath.contains("org/apache/spark/unsafe/types/CalendarInterval")))
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
index 8a5208d..cd973df 100755
--- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
+++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/decom.sh
@@ -32,4 +32,4 @@ timeout 60 tail --pid=${WORKER_PID} -f /dev/null
 date
 echo "Done"
 date
-sleep 30
+sleep 1
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index 279386d..28ab371 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -279,6 +279,7 @@ class KubernetesSuite extends SparkFunSuite
       appArgs = appArgs)
 
     val execPods = scala.collection.mutable.Map[String, Pod]()
+    val podsDeleted = scala.collection.mutable.HashSet[String]()
     val (patienceInterval, patienceTimeout) = {
       executorPatience match {
         case Some(patience) => (patience._1.getOrElse(INTERVAL), 
patience._2.getOrElse(TIMEOUT))
@@ -339,27 +340,21 @@ class KubernetesSuite extends SparkFunSuite
                 }
                 // Delete the pod to simulate cluster scale down/migration.
                 // This will allow the pod to remain up for the grace period
-                val pod = kubernetesTestComponents.kubernetesClient.pods()
-                  .withName(name)
-                pod.delete()
+                kubernetesTestComponents.kubernetesClient.pods()
+                  .withName(name).delete()
                 logDebug(s"Triggered pod decom/delete: $name deleted")
-                // Look for the string that indicates we should force kill the 
first
-                // Executor. This simulates the pod being fully lost.
-                logDebug("Waiting for second collect...")
+                // Make sure this pod is deleted
                 Eventually.eventually(TIMEOUT, INTERVAL) {
-                  assert(kubernetesTestComponents.kubernetesClient
-                    .pods()
-                    .withName(driverPodName)
-                    .getLog
-                    .contains("Waiting some more, please kill exec 1."),
-                    "Decommission test did not complete second collect.")
+                  assert(podsDeleted.contains(name))
+                }
+                // Then make sure this pod is replaced
+                Eventually.eventually(TIMEOUT, INTERVAL) {
+                  assert(execPods.size == 3)
                 }
-                logDebug("Force deleting")
-                val podNoGrace = pod.withGracePeriod(0)
-                podNoGrace.delete()
               }
             case Action.DELETED | Action.ERROR =>
               execPods.remove(name)
+              podsDeleted += name
           }
         }
       })
@@ -388,7 +383,6 @@ class KubernetesSuite extends SparkFunSuite
     Eventually.eventually(TIMEOUT, patienceInterval) {
       execPods.values.nonEmpty should be (true)
     }
-    execWatcher.close()
     execPods.values.foreach(executorPodChecker(_))
     Eventually.eventually(patienceTimeout, patienceInterval) {
       expectedLogOnCompletion.foreach { e =>
@@ -400,6 +394,7 @@ class KubernetesSuite extends SparkFunSuite
           s"The application did not complete, did not find str ${e}")
       }
     }
+    execWatcher.close()
   }
 
   protected def doBasicDriverPodCheck(driverPod: Pod): Unit = {
diff --git 
a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py 
b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
index d34e616..5fcad08 100644
--- a/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
+++ b/resource-managers/kubernetes/integration-tests/tests/decommissioning.py
@@ -47,11 +47,6 @@ if __name__ == "__main__":
     print("...")
     time.sleep(30)
     rdd.count()
-    print("Waiting some more, please kill exec 1.")
-    print("...")
-    time.sleep(30)
-    print("Executor node should be deleted now")
-    rdd.count()
     rdd.collect()
     print("Final accumulator value is: " + str(acc.value))
     print("Finished waiting, stopping Spark.")
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
index 58bd56c..a4b7b7a2 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManager.scala
@@ -23,7 +23,9 @@ import scala.util.Random
 import org.apache.spark.{ExecutorAllocationClient, SparkConf}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Streaming._
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
 import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
 import org.apache.spark.streaming.util.RecurringTimer
 import org.apache.spark.util.{Clock, Utils}
 
@@ -133,7 +135,13 @@ private[streaming] class ExecutorAllocationManager(
       logDebug(s"Removable executors (${removableExecIds.size}): 
${removableExecIds}")
       if (removableExecIds.nonEmpty) {
         val execIdToRemove = 
removableExecIds(Random.nextInt(removableExecIds.size))
-        client.killExecutor(execIdToRemove)
+        if (conf.get(WORKER_DECOMMISSION_ENABLED)) {
+          client.decommissionExecutor(execIdToRemove,
+            ExecutorDecommissionInfo("spark scale down", false),
+            adjustTargetNumExecutors = true)
+        } else {
+          client.killExecutor(execIdToRemove)
+        }
         logInfo(s"Requested to kill executor $execIdToRemove")
       } else {
         logInfo(s"No non-receiver executors to kill")
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index 65efa10..9e06625 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -27,7 +27,9 @@ import org.scalatestplus.mockito.MockitoSugar
 import org.apache.spark.{ExecutorAllocationClient, SparkConf}
 import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, 
DYN_ALLOCATION_TESTING}
 import org.apache.spark.internal.config.Streaming._
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
 import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.scheduler.ExecutorDecommissionInfo
 import org.apache.spark.streaming.{DummyInputDStream, Seconds, 
StreamingContext, TestSuiteBase}
 import org.apache.spark.util.{ManualClock, Utils}
 
@@ -44,11 +46,22 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
   }
 
   test("basic functionality") {
+    basicTest(decommissioning = false)
+  }
+
+  test("basic decommissioning") {
+    basicTest(decommissioning = true)
+  }
+
+  def basicTest(decommissioning: Boolean): Unit = {
     // Test that adding batch processing time info to allocation manager
     // causes executors to be requested and killed accordingly
+    conf.set(WORKER_DECOMMISSION_ENABLED, decommissioning)
 
     // There is 1 receiver, and exec 1 has been allocated to it
-    withAllocationManager(numReceivers = 1) { case (receiverTracker, 
allocationManager) =>
+    withAllocationManager(numReceivers = 1, conf = conf) {
+      case (receiverTracker, allocationManager) =>
+
       when(receiverTracker.allocatedExecutors).thenReturn(Map(1 -> Some("1")))
 
       /** Add data point for batch processing time and verify executor 
allocation */
@@ -83,53 +96,67 @@ class ExecutorAllocationManagerSuite extends TestSuiteBase
             Map.empty)}
       }
 
-      /** Verify that a particular executor was killed */
-      def verifyKilledExec(expectedKilledExec: Option[String]): Unit = {
-        if (expectedKilledExec.nonEmpty) {
-          verify(allocationClient, 
times(1)).killExecutor(meq(expectedKilledExec.get))
+      /** Verify that a particular executor was scaled down. */
+      def verifyScaledDownExec(expectedExec: Option[String]): Unit = {
+        if (expectedExec.nonEmpty) {
+          val decomInfo = ExecutorDecommissionInfo("spark scale down", false)
+          if (decommissioning) {
+            verify(allocationClient, times(1)).decommissionExecutor(
+              meq(expectedExec.get), meq(decomInfo), meq(true))
+            verify(allocationClient, never).killExecutor(meq(expectedExec.get))
+          } else {
+            verify(allocationClient, 
times(1)).killExecutor(meq(expectedExec.get))
+            verify(allocationClient, never).decommissionExecutor(
+              meq(expectedExec.get), meq(decomInfo), meq(true))
+          }
         } else {
-          verify(allocationClient, never).killExecutor(null)
+          if (decommissioning) {
+            verify(allocationClient, never).decommissionExecutor(null, null, 
false)
+            verify(allocationClient, never).decommissionExecutor(null, null, 
true)
+          } else {
+            verify(allocationClient, never).killExecutor(null)
+          }
         }
       }
 
       // Batch proc time = batch interval, should increase allocation by 1
       addBatchProcTimeAndVerifyAllocation(batchDurationMillis) {
         verifyTotalRequestedExecs(Some(3)) // one already allocated, increase 
allocation by 1
-        verifyKilledExec(None)
+        verifyScaledDownExec(None)
       }
 
       // Batch proc time = batch interval * 2, should increase allocation by 2
       addBatchProcTimeAndVerifyAllocation(batchDurationMillis * 2) {
         verifyTotalRequestedExecs(Some(4))
-        verifyKilledExec(None)
+        verifyScaledDownExec(None)
       }
 
       // Batch proc time slightly more than the scale up ratio, should 
increase allocation by 1
       addBatchProcTimeAndVerifyAllocation(
         batchDurationMillis * 
STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get + 1) {
         verifyTotalRequestedExecs(Some(3))
-        verifyKilledExec(None)
+        verifyScaledDownExec(None)
       }
 
       // Batch proc time slightly less than the scale up ratio, should not 
change allocation
       addBatchProcTimeAndVerifyAllocation(
         batchDurationMillis * 
STREAMING_DYN_ALLOCATION_SCALING_UP_RATIO.defaultValue.get - 1) {
         verifyTotalRequestedExecs(None)
-        verifyKilledExec(None)
+        verifyScaledDownExec(None)
       }
 
       // Batch proc time slightly more than the scale down ratio, should not 
change allocation
       addBatchProcTimeAndVerifyAllocation(
         batchDurationMillis * 
STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get + 1) {
         verifyTotalRequestedExecs(None)
-        verifyKilledExec(None)
+        verifyScaledDownExec(None)
       }
 
       // Batch proc time slightly more than the scale down ratio, should not 
change allocation
       addBatchProcTimeAndVerifyAllocation(
         batchDurationMillis * 
STREAMING_DYN_ALLOCATION_SCALING_DOWN_RATIO.defaultValue.get - 1) {
         verifyTotalRequestedExecs(None)
-        verifyKilledExec(Some("2"))
+        verifyScaledDownExec(Some("2"))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to