This is an automated email from the ASF dual-hosted git repository.

irashid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b312033  [SPARK-20286][CORE] Improve logic for timing out executors in 
dynamic allocation.
b312033 is described below

commit b312033bd33cd6cbf1f166ccaa7a5df4e3421078
Author: Marcelo Vanzin <van...@cloudera.com>
AuthorDate: Wed Jun 5 08:09:44 2019 -0500

    [SPARK-20286][CORE] Improve logic for timing out executors in dynamic 
allocation.
    
    This change refactors the portions of the ExecutorAllocationManager class 
that
    track executor state into a new class, to achieve a few goals:
    
    - make the code easier to understand
    - better separate concerns (task backlog vs. executor state)
    - less synchronization between event and allocation threads
    - less coupling between the allocation code and executor state tracking
    
    The executor tracking code was moved to a new class (ExecutorMonitor) that
    encapsulates all the logic of tracking what happens to executors and when
    they can be timed out. The logic to actually remove the executors remains
    in the EAM, since it still requires information that is not tracked by the
    new executor monitor code.
    
    In the executor monitor itself, of interest, specifically, is a change in
    how cached blocks are tracked; instead of polling the block manager, the
    monitor now uses events to track which executors have cached blocks, and
    is able to detect also unpersist events and adjust the time when the 
executor
    should be removed accordingly. (That's the bug mentioned in the PR title.)
    
    Because of the refactoring, a few tests in the old EAM test suite were 
removed,
    since they're now covered by the newly added test suite. The EAM suite was
    also changed a little bit to not instantiate a SparkContext every time. This
    allowed some cleanup, and the tests also run faster.
    
    Tested with new and updated unit tests, and with multiple TPC-DS workloads
    running with dynamic allocation on; also some manual tests for the caching
    behavior.
    
    Closes #24704 from vanzin/SPARK-20286.
    
    Authored-by: Marcelo Vanzin <van...@cloudera.com>
    Signed-off-by: Imran Rashid <iras...@cloudera.com>
---
 .../apache/spark/ExecutorAllocationClient.scala    |   9 +-
 .../apache/spark/ExecutorAllocationManager.scala   | 239 +------
 .../main/scala/org/apache/spark/SparkContext.scala |   3 +-
 .../org/apache/spark/internal/config/package.scala |   8 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |   4 +
 .../spark/scheduler/dynalloc/ExecutorMonitor.scala | 283 ++++++++
 .../apache/spark/storage/BlockManagerMaster.scala  |   9 -
 .../spark/storage/BlockManagerMasterEndpoint.scala |  31 -
 .../spark/storage/BlockManagerMessages.scala       |   3 -
 ...g.apache.spark.scheduler.ExternalClusterManager |   1 -
 .../spark/ExecutorAllocationManagerSuite.scala     | 708 ++++++---------------
 .../deploy/ExternalShuffleServiceDbSuite.scala     |   9 +-
 .../scheduler/dynalloc/ExecutorMonitorSuite.scala  | 289 +++++++++
 .../spark/storage/BlockManagerInfoSuite.scala      |  14 -
 .../scheduler/ExecutorAllocationManagerSuite.scala |   2 +-
 15 files changed, 822 insertions(+), 790 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
index 63d87b4..cb965cb 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala
@@ -23,11 +23,18 @@ package org.apache.spark
  */
 private[spark] trait ExecutorAllocationClient {
 
-
   /** Get the list of currently active executors */
   private[spark] def getExecutorIds(): Seq[String]
 
   /**
+   * Whether an executor is active. An executor is active when it can be used 
to execute tasks
+   * for jobs submitted by the application.
+   *
+   * @return whether the executor with the given ID is currently active.
+   */
+  def isExecutorActive(id: String): Boolean
+
+  /**
    * Update the cluster manager on our scheduling needs. Three bits of 
information are included
    * to help it make decisions.
    * @param numExecutors The total number of executors we'd like to have. The 
cluster manager
diff --git 
a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala 
b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 1782027..63df7cc 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -30,7 +30,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
 import org.apache.spark.metrics.source.Source
 import org.apache.spark.scheduler._
-import org.apache.spark.storage.BlockManagerMaster
+import org.apache.spark.scheduler.dynalloc.ExecutorMonitor
 import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 /**
@@ -94,7 +94,7 @@ private[spark] class ExecutorAllocationManager(
     client: ExecutorAllocationClient,
     listenerBus: LiveListenerBus,
     conf: SparkConf,
-    blockManagerMaster: BlockManagerMaster)
+    clock: Clock = new SystemClock())
   extends Logging {
 
   allocationManager =>
@@ -113,11 +113,6 @@ private[spark] class ExecutorAllocationManager(
   private val sustainedSchedulerBacklogTimeoutS =
     conf.get(DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT)
 
-  // How long an executor must be idle for before it is removed (seconds)
-  private val executorIdleTimeoutS = 
conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)
-
-  private val cachedExecutorIdleTimeoutS = 
conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)
-
   // During testing, the methods to actually kill and add executors are mocked 
out
   private val testing = conf.get(DYN_ALLOCATION_TESTING)
 
@@ -139,20 +134,10 @@ private[spark] class ExecutorAllocationManager(
   // is the number of executors we would immediately want from the cluster 
manager.
   private var numExecutorsTarget = initialNumExecutors
 
-  // Executors that have been requested to be removed but have not been killed 
yet
-  private val executorsPendingToRemove = new mutable.HashSet[String]
-
-  // All known executors
-  private val executorIds = new mutable.HashSet[String]
-
   // A timestamp of when an addition should be triggered, or NOT_SET if it is 
not set
   // This is set when pending tasks are added but not scheduled yet
   private var addTime: Long = NOT_SET
 
-  // A timestamp for each executor of when the executor should be removed, 
indexed by the ID
-  // This is set when an executor is no longer running a task, or when it 
first registers
-  private val removeTimes = new mutable.HashMap[String, Long]
-
   // Polling loop interval (ms)
   private val intervalMillis: Long = if (Utils.isTesting) {
       conf.get(TEST_SCHEDULE_INTERVAL)
@@ -160,12 +145,11 @@ private[spark] class ExecutorAllocationManager(
       100
     }
 
-  // Clock used to schedule when executors should be added and removed
-  private var clock: Clock = new SystemClock()
-
   // Listener for Spark events that impact the allocation policy
   val listener = new ExecutorAllocationListener
 
+  val executorMonitor = new ExecutorMonitor(conf, client, clock)
+
   // Executor that handles the scheduling task.
   private val executor =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")
@@ -210,12 +194,6 @@ private[spark] class ExecutorAllocationManager(
       throw new SparkException(
         s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 
0!")
     }
-    if (executorIdleTimeoutS < 0) {
-      throw new SparkException(s"${DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key} 
must be >= 0!")
-    }
-    if (cachedExecutorIdleTimeoutS < 0) {
-      throw new 
SparkException(s"${DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key} must be >= 
0!")
-    }
     // Require external shuffle service for dynamic allocation
     // Otherwise, we may lose shuffle files when killing executors
     if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
@@ -230,18 +208,12 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Use a different clock for this allocation manager. This is mainly used 
for testing.
-   */
-  def setClock(newClock: Clock): Unit = {
-    clock = newClock
-  }
-
-  /**
    * Register for scheduler callbacks to decide when to add and remove 
executors, and start
    * the scheduling task.
    */
   def start(): Unit = {
     listenerBus.addToManagementQueue(listener)
+    listenerBus.addToManagementQueue(executorMonitor)
 
     val scheduleTask = new Runnable() {
       override def run(): Unit = {
@@ -278,8 +250,7 @@ private[spark] class ExecutorAllocationManager(
   def reset(): Unit = synchronized {
     addTime = 0L
     numExecutorsTarget = initialNumExecutors
-    executorsPendingToRemove.clear()
-    removeTimes.clear()
+    executorMonitor.reset()
   }
 
   /**
@@ -307,19 +278,13 @@ private[spark] class ExecutorAllocationManager(
    * This is factored out into its own method for testing.
    */
   private def schedule(): Unit = synchronized {
-    val now = clock.getTimeMillis
-
-    val executorIdsToBeRemoved = ArrayBuffer[String]()
-    removeTimes.retain { case (executorId, expireTime) =>
-      val expired = now >= expireTime
-      if (expired) {
-        initializing = false
-        executorIdsToBeRemoved += executorId
-      }
-      !expired
+    val executorIdsToBeRemoved = executorMonitor.timedOutExecutors()
+    if (executorIdsToBeRemoved.nonEmpty) {
+      initializing = false
     }
+
     // Update executor target number only after initializing flag is unset
-    updateAndSyncNumExecutorsTarget(now)
+    updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
     if (executorIdsToBeRemoved.nonEmpty) {
       removeExecutors(executorIdsToBeRemoved)
     }
@@ -395,7 +360,7 @@ private[spark] class ExecutorAllocationManager(
     val oldNumExecutorsTarget = numExecutorsTarget
     // There's no point in wasting time ramping up to the number of executors 
we already have, so
     // make sure our target is at least as much as our current allocation:
-    numExecutorsTarget = math.max(numExecutorsTarget, executorIds.size)
+    numExecutorsTarget = math.max(numExecutorsTarget, 
executorMonitor.executorCount)
     // Boost our target with the number to add for this round:
     numExecutorsTarget += numExecutorsToAdd
     // Ensure that our target doesn't exceed what we need at the present 
moment:
@@ -455,7 +420,7 @@ private[spark] class ExecutorAllocationManager(
     val executorIdsToBeRemoved = new ArrayBuffer[String]
 
     logInfo("Request to remove executorIds: " + executors.mkString(", "))
-    val numExistingExecutors = allocationManager.executorIds.size - 
executorsPendingToRemove.size
+    val numExistingExecutors = executorMonitor.executorCount - 
executorMonitor.pendingRemovalCount
 
     var newExecutorTotal = numExistingExecutors
     executors.foreach { executorIdToBeRemoved =>
@@ -465,7 +430,7 @@ private[spark] class ExecutorAllocationManager(
       } else if (newExecutorTotal - 1 < numExecutorsTarget) {
         logDebug(s"Not removing idle executor $executorIdToBeRemoved because 
there are only " +
           s"$newExecutorTotal executor(s) left (number of executor target 
$numExecutorsTarget)")
-      } else if (canBeKilled(executorIdToBeRemoved)) {
+      } else {
         executorIdsToBeRemoved += executorIdToBeRemoved
         newExecutorTotal -= 1
       }
@@ -484,24 +449,17 @@ private[spark] class ExecutorAllocationManager(
       client.killExecutors(executorIdsToBeRemoved, adjustTargetNumExecutors = 
false,
         countFailures = false, force = false)
     }
+
     // [SPARK-21834] killExecutors api reduces the target number of executors.
     // So we need to update the target with desired value.
     client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, 
hostToLocalTaskCount)
     // reset the newExecutorTotal to the existing number of executors
     newExecutorTotal = numExistingExecutors
     if (testing || executorsRemoved.nonEmpty) {
-      executorsRemoved.foreach { removedExecutorId =>
-        // If it has an exclusive cached block then cachedExecutorIdleTimeoutS 
is used for timeout
-        val idleTimeout = if 
(blockManagerMaster.hasExclusiveCachedBlocks(removedExecutorId)) {
-          cachedExecutorIdleTimeoutS
-        } else {
-          executorIdleTimeoutS
-        }
-        newExecutorTotal -= 1
-        logInfo(s"Removing executor $removedExecutorId because it has been 
idle for " +
-          s"$idleTimeout seconds (new desired total will be 
$newExecutorTotal)")
-        executorsPendingToRemove.add(removedExecutorId)
-      }
+      newExecutorTotal -= executorsRemoved.size
+      executorMonitor.executorsKilled(executorsRemoved)
+      logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to 
idle timeout." +
+        s"(new desired total will be $newExecutorTotal)")
       executorsRemoved
     } else {
       logWarning(s"Unable to reach the cluster manager to kill executor/s " +
@@ -511,70 +469,6 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Request the cluster manager to remove the given executor.
-   * Return whether the request is acknowledged.
-   */
-  private def removeExecutor(executorId: String): Boolean = synchronized {
-    val executorsRemoved = removeExecutors(Seq(executorId))
-    executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
-  }
-
-  /**
-   * Determine if the given executor can be killed.
-   */
-  private def canBeKilled(executorId: String): Boolean = synchronized {
-    // Do not kill the executor if we are not aware of it (should never happen)
-    if (!executorIds.contains(executorId)) {
-      logWarning(s"Attempted to remove unknown executor $executorId!")
-      return false
-    }
-
-    // Do not kill the executor again if it is already pending to be killed 
(should never happen)
-    if (executorsPendingToRemove.contains(executorId)) {
-      logWarning(s"Attempted to remove executor $executorId " +
-        s"when it is already pending to be removed!")
-      return false
-    }
-
-    true
-  }
-
-  /**
-   * Callback invoked when the specified executor has been added.
-   */
-  private def onExecutorAdded(executorId: String): Unit = synchronized {
-    if (!executorIds.contains(executorId)) {
-      executorIds.add(executorId)
-      // If an executor (call this executor X) is not removed because the 
lower bound
-      // has been reached, it will no longer be marked as idle. When new 
executors join,
-      // however, we are no longer at the lower bound, and so we must mark 
executor X
-      // as idle again so as not to forget that it is a candidate for removal. 
(see SPARK-4951)
-      executorIds.filter(listener.isExecutorIdle).foreach(onExecutorIdle)
-      logInfo(s"New executor $executorId has registered (new total is 
${executorIds.size})")
-    } else {
-      logWarning(s"Duplicate executor $executorId has registered")
-    }
-  }
-
-  /**
-   * Callback invoked when the specified executor has been removed.
-   */
-  private def onExecutorRemoved(executorId: String): Unit = synchronized {
-    if (executorIds.contains(executorId)) {
-      executorIds.remove(executorId)
-      removeTimes.remove(executorId)
-      logInfo(s"Existing executor $executorId has been removed (new total is 
${executorIds.size})")
-      if (executorsPendingToRemove.contains(executorId)) {
-        executorsPendingToRemove.remove(executorId)
-        logDebug(s"Executor $executorId is no longer pending to " +
-          s"be removed (${executorsPendingToRemove.size} left)")
-      }
-    } else {
-      logWarning(s"Unknown executor $executorId has been removed!")
-    }
-  }
-
-  /**
    * Callback invoked when the scheduler receives new pending tasks.
    * This sets a time in the future that decides when executors should be added
    * if it is not already set.
@@ -598,46 +492,6 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Callback invoked when the specified executor is no longer running any 
tasks.
-   * This sets a time in the future that decides when this executor should be 
removed if
-   * the executor is not already marked as idle.
-   */
-  private def onExecutorIdle(executorId: String): Unit = synchronized {
-    if (executorIds.contains(executorId)) {
-      if (!removeTimes.contains(executorId) && 
!executorsPendingToRemove.contains(executorId)) {
-        // Note that it is not necessary to query the executors since all the 
cached blocks we are
-        // concerned with are reported to the driver. This does not include 
broadcast blocks and
-        // non-exclusive blocks which are also available via the external 
shuffle service.
-        val hasCachedBlocks = 
blockManagerMaster.hasExclusiveCachedBlocks(executorId)
-        val now = clock.getTimeMillis()
-        val timeout = {
-          if (hasCachedBlocks) {
-            // Use a different timeout if the executor has cached blocks.
-            now + cachedExecutorIdleTimeoutS * 1000
-          } else {
-            now + executorIdleTimeoutS * 1000
-          }
-        }
-        val realTimeout = if (timeout <= 0) Long.MaxValue else timeout // 
overflow
-        removeTimes(executorId) = realTimeout
-        logDebug(s"Starting idle timer for $executorId because there are no 
more tasks " +
-          s"scheduled to run on the executor (to expire in ${(realTimeout - 
now)/1000} seconds)")
-      }
-    } else {
-      logWarning(s"Attempted to mark unknown executor $executorId idle")
-    }
-  }
-
-  /**
-   * Callback invoked when the specified executor is now running a task.
-   * This resets all variables used for removing this executor.
-   */
-  private def onExecutorBusy(executorId: String): Unit = synchronized {
-    logDebug(s"Clearing idle timer for $executorId because it is now running a 
task")
-    removeTimes.remove(executorId)
-  }
-
-  /**
    * A listener that notifies the given allocation manager of when to add and 
remove executors.
    *
    * This class is intentionally conservative in its assumptions about the 
relative ordering
@@ -650,7 +504,6 @@ private[spark] class ExecutorAllocationManager(
     // Should be 0 when no stages are active.
     private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
     private val stageIdToTaskIndices = new mutable.HashMap[Int, 
mutable.HashSet[Int]]
-    private val executorIdToTaskIds = new mutable.HashMap[String, 
mutable.HashSet[Long]]
     // Number of speculative tasks to be scheduled in each stage
     private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
     // The speculative tasks started in each stage
@@ -716,23 +569,11 @@ private[spark] class ExecutorAllocationManager(
       val stageId = taskStart.stageId
       val taskId = taskStart.taskInfo.taskId
       val taskIndex = taskStart.taskInfo.index
-      val executorId = taskStart.taskInfo.executorId
 
       allocationManager.synchronized {
         if (stageIdToNumRunningTask.contains(stageId)) {
           stageIdToNumRunningTask(stageId) += 1
         }
-        // This guards against the following race condition:
-        // 1. The `SparkListenerTaskStart` event is posted before the
-        // `SparkListenerExecutorAdded` event
-        // 2. The `SparkListenerExecutorRemoved` event is posted before the
-        // `SparkListenerTaskStart` event
-        // Above cases are possible because these events are posted in 
different threads.
-        // (see SPARK-4951 SPARK-26927)
-        if (!allocationManager.executorIds.contains(executorId) &&
-            client.getExecutorIds().contains(executorId)) {
-          allocationManager.onExecutorAdded(executorId)
-        }
 
         // If this is the last pending task, mark the scheduler queue as empty
         if (taskStart.taskInfo.speculative) {
@@ -744,15 +585,10 @@ private[spark] class ExecutorAllocationManager(
         if (totalPendingTasks() == 0) {
           allocationManager.onSchedulerQueueEmpty()
         }
-
-        // Mark the executor on which this task is scheduled as busy
-        executorIdToTaskIds.getOrElseUpdate(executorId, new 
mutable.HashSet[Long]) += taskId
-        allocationManager.onExecutorBusy(executorId)
       }
     }
 
     override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
-      val executorId = taskEnd.taskInfo.executorId
       val taskId = taskEnd.taskInfo.taskId
       val taskIndex = taskEnd.taskInfo.index
       val stageId = taskEnd.stageId
@@ -760,14 +596,6 @@ private[spark] class ExecutorAllocationManager(
         if (stageIdToNumRunningTask.contains(stageId)) {
           stageIdToNumRunningTask(stageId) -= 1
         }
-        // If the executor is no longer running any scheduled tasks, mark it 
as idle
-        if (executorIdToTaskIds.contains(executorId)) {
-          executorIdToTaskIds(executorId) -= taskId
-          if (executorIdToTaskIds(executorId).isEmpty) {
-            executorIdToTaskIds -= executorId
-            allocationManager.onExecutorIdle(executorId)
-          }
-        }
 
         // If the task failed, we expect it to be resubmitted later. To ensure 
we have
         // enough resources to run the resubmitted task, we need to mark the 
scheduler
@@ -785,22 +613,6 @@ private[spark] class ExecutorAllocationManager(
       }
     }
 
-    override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): 
Unit = {
-      val executorId = executorAdded.executorId
-      if (executorId != SparkContext.DRIVER_IDENTIFIER) {
-        // This guards against the race condition in which the 
`SparkListenerTaskStart`
-        // event is posted before the `SparkListenerBlockManagerAdded` event, 
which is
-        // possible because these events are posted in different threads. (see 
SPARK-4951)
-        if (!allocationManager.executorIds.contains(executorId)) {
-          allocationManager.onExecutorAdded(executorId)
-        }
-      }
-    }
-
-    override def onExecutorRemoved(executorRemoved: 
SparkListenerExecutorRemoved): Unit = {
-      allocationManager.onExecutorRemoved(executorRemoved.executorId)
-    }
-
     override def onSpeculativeTaskSubmitted(speculativeTask: 
SparkListenerSpeculativeTaskSubmitted)
       : Unit = {
        val stageId = speculativeTask.stageId
@@ -842,15 +654,6 @@ private[spark] class ExecutorAllocationManager(
     }
 
     /**
-     * Return true if an executor is not currently running a task, and false 
otherwise.
-     *
-     * Note: This is not thread-safe without the caller owning the 
`allocationManager` lock.
-     */
-    def isExecutorIdle(executorId: String): Boolean = {
-      !executorIdToTaskIds.contains(executorId)
-    }
-
-    /**
      * Update the Executor placement hints (the number of tasks with locality 
preferences,
      * a map where each pair is a node and the number of tasks that would like 
to be scheduled
      * on that node).
@@ -892,8 +695,8 @@ private[spark] class ExecutorAllocationManager(
     }
 
     registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0)
-    registerGauge("numberExecutorsPendingToRemove", 
executorsPendingToRemove.size, 0)
-    registerGauge("numberAllExecutors", executorIds.size, 0)
+    registerGauge("numberExecutorsPendingToRemove", 
executorMonitor.pendingRemovalCount, 0)
+    registerGauge("numberAllExecutors", executorMonitor.executorCount, 0)
     registerGauge("numberTargetExecutors", numExecutorsTarget, 0)
     registerGauge("numberMaxNeededExecutors", maxNumExecutorsNeeded(), 0)
   }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index b00bb9a..66f8f41 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -585,8 +585,7 @@ class SparkContext(config: SparkConf) extends Logging {
         schedulerBackend match {
           case b: ExecutorAllocationClient =>
             Some(new ExecutorAllocationManager(
-              schedulerBackend.asInstanceOf[ExecutorAllocationClient], 
listenerBus, _conf,
-              _env.blockManager.master))
+              schedulerBackend.asInstanceOf[ExecutorAllocationClient], 
listenerBus, _conf))
           case _ =>
             None
         }
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 32221ee..90826bb 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -357,11 +357,15 @@ package object config {
 
   private[spark] val DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT =
     ConfigBuilder("spark.dynamicAllocation.cachedExecutorIdleTimeout")
-      .timeConf(TimeUnit.SECONDS).createWithDefault(Integer.MAX_VALUE)
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ >= 0L, "Timeout must be >= 0.")
+      .createWithDefault(Integer.MAX_VALUE)
 
   private[spark] val DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT =
     ConfigBuilder("spark.dynamicAllocation.executorIdleTimeout")
-      .timeConf(TimeUnit.SECONDS).createWithDefault(60)
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ >= 0L, "Timeout must be >= 0.")
+      .createWithDefault(60)
 
   private[spark] val DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT =
     ConfigBuilder("spark.dynamicAllocation.schedulerBacklogTimeout")
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 9f53588..5f8c277 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -540,6 +540,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     executorDataMap.keySet.toSeq
   }
 
+  override def isExecutorActive(id: String): Boolean = synchronized {
+    executorDataMap.contains(id) && !executorsPendingToRemove.contains(id)
+  }
+
   override def maxNumConcurrentTasks(): Int = {
     executorDataMap.values.map { executor =>
       executor.totalCores / scheduler.CPUS_PER_TASK
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
new file mode 100644
index 0000000..9aac4d2
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.dynalloc
+
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler._
+import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.util.Clock
+
+/**
+ * A monitor for executor activity, used by ExecutorAllocationManager to 
detect idle executors.
+ */
+private[spark] class ExecutorMonitor(
+    conf: SparkConf,
+    client: ExecutorAllocationClient,
+    clock: Clock) extends SparkListener with Logging {
+
+  private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(
+    conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
+  private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(
+    conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT))
+  private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) &&
+    conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
+
+  private val executors = new ConcurrentHashMap[String, Tracker]()
+
+  // The following fields are an optimization to avoid having to scan all 
executors on every EAM
+  // schedule interval to find out which ones are timed out. They keep track 
of when the next
+  // executor timeout is expected to happen, and the current list of timed out 
executors. There's
+  // also a flag that forces the EAM task to recompute the timed out 
executors, in case some event
+  // arrives on the listener bus that may cause the current list of timed out 
executors to change.
+  //
+  // There's also per-executor state used for this purpose, so that 
recomputations can be triggered
+  // only when really necessary.
+  //
+  // Note that this isn't meant to, and cannot, always make the right decision 
about which executors
+  // are indeed timed out. For example, the EAM thread may detect a timed out 
executor while a new
+  // "task start" event has just been posted to the listener bus and hasn't 
yet been delivered to
+  // this listener. There are safeguards in other parts of the code that would 
prevent that executor
+  // from being removed.
+  private val nextTimeout = new AtomicLong(Long.MaxValue)
+  private var timedOutExecs = Seq.empty[String]
+
+  def reset(): Unit = {
+    executors.clear()
+    nextTimeout.set(Long.MaxValue)
+    timedOutExecs = Nil
+  }
+
+  /**
+   * Returns the list of executors that are currently considered to be timed 
out.
+   * Should only be called from the EAM thread.
+   */
+  def timedOutExecutors(): Seq[String] = {
+    val now = clock.getTimeMillis()
+    if (now >= nextTimeout.get()) {
+      // Temporarily set the next timeout at Long.MaxValue. This ensures that 
after
+      // scanning all executors below, we know when the next timeout for 
non-timed out
+      // executors is (whether that update came from the scan, or from a new 
event
+      // arriving in a different thread).
+      nextTimeout.set(Long.MaxValue)
+
+      var newNextTimeout = Long.MaxValue
+      timedOutExecs = executors.asScala
+        .filter { case (_, exec) => !exec.pendingRemoval }
+        .filter { case (_, exec) =>
+          val deadline = exec.timeoutAt
+          if (deadline > now) {
+            newNextTimeout = math.min(newNextTimeout, deadline)
+            exec.timedOut = false
+            false
+          } else {
+            exec.timedOut = true
+            true
+          }
+        }
+        .keys
+        .toSeq
+      updateNextTimeout(newNextTimeout)
+    }
+    timedOutExecs
+  }
+
+  /**
+   * Mark the given executors as pending to be removed. Should only be called 
in the EAM thread.
+   */
+  def executorsKilled(ids: Seq[String]): Unit = {
+    ids.foreach { id =>
+      val tracker = executors.get(id)
+      if (tracker != null) {
+        tracker.pendingRemoval = true
+      }
+    }
+
+    // Recompute timed out executors in the next EAM callback, since this call 
invalidates
+    // the current list.
+    nextTimeout.set(Long.MinValue)
+  }
+
+  def executorCount: Int = executors.size()
+
+  def pendingRemovalCount: Int = executors.asScala.count { case (_, exec) => 
exec.pendingRemoval }
+
+  override def onTaskStart(event: SparkListenerTaskStart): Unit = {
+    val executorId = event.taskInfo.executorId
+    // Guard against a late arriving task start event (SPARK-26927).
+    if (client.isExecutorActive(executorId)) {
+      val exec = ensureExecutorIsTracked(executorId)
+      exec.updateRunningTasks(1)
+    }
+  }
+
+  override def onTaskEnd(event: SparkListenerTaskEnd): Unit = {
+    val executorId = event.taskInfo.executorId
+    val exec = executors.get(executorId)
+    if (exec != null) {
+      exec.updateRunningTasks(-1)
+    }
+  }
+
+  override def onExecutorAdded(event: SparkListenerExecutorAdded): Unit = {
+    val exec = ensureExecutorIsTracked(event.executorId)
+    exec.updateRunningTasks(0)
+    logInfo(s"New executor ${event.executorId} has registered (new total is 
${executors.size()})")
+  }
+
+  override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
+    val removed = executors.remove(event.executorId)
+    if (removed != null) {
+      logInfo(s"Executor ${event.executorId} removed (new total is 
${executors.size()})")
+      if (!removed.pendingRemoval) {
+        nextTimeout.set(Long.MinValue)
+      }
+    }
+  }
+
+  override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
+    if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
+      return
+    }
+
+    val exec = 
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId)
+    val storageLevel = event.blockUpdatedInfo.storageLevel
+    val blockId = event.blockUpdatedInfo.blockId.asInstanceOf[RDDBlockId]
+
+    // SPARK-27677. When a block can be fetched from the external shuffle 
service, the executor can
+    // be removed without hurting the application too much, since the cached 
data is still
+    // available. So don't count blocks that can be served by the external 
service.
+    if (storageLevel.isValid && (!fetchFromShuffleSvcEnabled || 
!storageLevel.useDisk)) {
+      val hadCachedBlocks = exec.cachedBlocks.nonEmpty
+
+      val blocks = exec.cachedBlocks.getOrElseUpdate(blockId.rddId,
+        new mutable.BitSet(blockId.splitIndex))
+      blocks += blockId.splitIndex
+
+      if (!hadCachedBlocks) {
+        exec.updateTimeout()
+      }
+    } else {
+      exec.cachedBlocks.get(blockId.rddId).foreach { blocks =>
+        blocks -= blockId.splitIndex
+        if (blocks.isEmpty) {
+          exec.cachedBlocks -= blockId.rddId
+          if (exec.cachedBlocks.isEmpty) {
+            exec.updateTimeout()
+          }
+        }
+      }
+    }
+  }
+
+  override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
+    executors.values().asScala.foreach { exec =>
+      exec.cachedBlocks -= event.rddId
+      if (exec.cachedBlocks.isEmpty) {
+        exec.updateTimeout()
+      }
+    }
+  }
+
+  // Visible for testing.
+  private[dynalloc] def isExecutorIdle(id: String): Boolean = {
+    Option(executors.get(id)).map(_.isIdle).getOrElse(throw new 
NoSuchElementException(id))
+  }
+
+  // Visible for testing
+  private[dynalloc] def timedOutExecutors(when: Long): Seq[String] = {
+    executors.asScala.flatMap { case (id, tracker) =>
+      if (tracker.timeoutAt <= when) Some(id) else None
+    }.toSeq
+  }
+
+  // Visible for testing
+  def executorsPendingToRemove(): Set[String] = {
+    executors.asScala.filter { case (_, exec) => exec.pendingRemoval 
}.keys.toSet
+  }
+
+  /**
+   * This method should be used when updating executor state. It guards 
against a race condition in
+   * which the `SparkListenerTaskStart` event is posted before the 
`SparkListenerBlockManagerAdded`
+   * event, which is possible because these events are posted in different 
threads. (see SPARK-4951)
+   */
+  private def ensureExecutorIsTracked(id: String): Tracker = {
+    executors.computeIfAbsent(id, _ => new Tracker())
+  }
+
+  private def updateNextTimeout(newValue: Long): Unit = {
+    while (true) {
+      val current = nextTimeout.get()
+      if (newValue >= current || nextTimeout.compareAndSet(current, newValue)) 
{
+        return
+      }
+    }
+  }
+
+  private class Tracker {
+    @volatile var timeoutAt: Long = Long.MaxValue
+
+    // Tracks whether this executor is thought to be timed out. It's used to 
detect when the list
+    // of timed out executors needs to be updated due to the executor's state 
changing.
+    @volatile var timedOut: Boolean = false
+
+    var pendingRemoval: Boolean = false
+
+    private var idleStart: Long = -1
+    private var runningTasks: Int = 0
+
+    // Maps RDD IDs to the partition IDs stored in the executor.
+    // This should only be used in the event thread.
+    val cachedBlocks = new mutable.HashMap[Int, mutable.BitSet]()
+
+    // For testing.
+    def isIdle: Boolean = idleStart >= 0
+
+    def updateRunningTasks(delta: Int): Unit = {
+      runningTasks = math.max(0, runningTasks + delta)
+      idleStart = if (runningTasks == 0) clock.getTimeMillis() else -1L
+      updateTimeout()
+    }
+
+    def updateTimeout(): Unit = {
+      val oldDeadline = timeoutAt
+      val newDeadline = if (idleStart >= 0) {
+        idleStart + (if (cachedBlocks.nonEmpty) storageTimeoutMs else 
idleTimeoutMs)
+      } else {
+        Long.MaxValue
+      }
+
+      timeoutAt = newDeadline
+
+      // If the executor was thought to be timed out, but the new deadline is 
later than the
+      // old one, ask the EAM thread to update the list of timed out executors.
+      if (newDeadline > oldDeadline && timedOut) {
+        nextTimeout.set(Long.MinValue)
+      } else {
+        updateNextTimeout(newDeadline)
+      }
+    }
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index b18d38f..939a134 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -222,15 +222,6 @@ class BlockManagerMaster(
     timeout.awaitResult(future)
   }
 
-  /**
-   * Find out if the executor has cached blocks which are not available via 
the external shuffle
-   * service.
-   * This method does not consider broadcast blocks, since they are not 
reported to the master.
-   */
-  def hasExclusiveCachedBlocks(executorId: String): Boolean = {
-    driverEndpoint.askSync[Boolean](HasExclusiveCachedBlocks(executorId))
-  }
-
   /** Stop the driver endpoint, called only on the Spark driver node */
   def stop() {
     if (driverEndpoint != null && isDriver) {
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
index 65ec1c3..2057d1b 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
@@ -147,18 +147,6 @@ class BlockManagerMasterEndpoint(
 
     case BlockManagerHeartbeat(blockManagerId) =>
       context.reply(heartbeatReceived(blockManagerId))
-
-    case HasExclusiveCachedBlocks(executorId) =>
-      blockManagerIdByExecutor.get(executorId) match {
-        case Some(bm) =>
-          if (blockManagerInfo.contains(bm)) {
-            val bmInfo = blockManagerInfo(bm)
-            context.reply(bmInfo.exclusiveCachedBlocks.nonEmpty)
-          } else {
-            context.reply(false)
-          }
-        case None => context.reply(false)
-      }
   }
 
   private def removeRdd(rddId: Int): Future[Seq[Int]] = {
@@ -589,12 +577,6 @@ private[spark] class BlockManagerInfo(
   // Mapping from block id to its status.
   private val _blocks = new JHashMap[BlockId, BlockStatus]
 
-  /**
-   * Cached blocks which are not available via the external shuffle service.
-   * This does not include broadcast blocks.
-   */
-  private val _exclusiveCachedBlocks = new mutable.HashSet[BlockId]
-
   def getStatus(blockId: BlockId): Option[BlockStatus] = 
Option(_blocks.get(blockId))
 
   def updateLastSeenMs() {
@@ -662,15 +644,6 @@ private[spark] class BlockManagerInfo(
         }
       }
 
-      if (!blockId.isBroadcast) {
-        if (!externalShuffleServiceEnabled || !storageLevel.useDisk) {
-          _exclusiveCachedBlocks += blockId
-        } else if (blockExists) {
-          // removing block from the exclusive cached blocks when updated to 
non-exclusive
-          _exclusiveCachedBlocks -= blockId
-        }
-      }
-
       externalShuffleServiceBlockStatus.foreach { shuffleServiceBlocks =>
         if (!blockId.isBroadcast && blockStatus.diskSize > 0) {
           shuffleServiceBlocks.put(blockId, blockStatus)
@@ -679,7 +652,6 @@ private[spark] class BlockManagerInfo(
     } else if (blockExists) {
       // If isValid is not true, drop the block.
       _blocks.remove(blockId)
-      _exclusiveCachedBlocks -= blockId
       externalShuffleServiceBlockStatus.foreach { blockStatus =>
         blockStatus.remove(blockId)
       }
@@ -703,7 +675,6 @@ private[spark] class BlockManagerInfo(
         blockStatus.remove(blockId)
       }
     }
-    _exclusiveCachedBlocks -= blockId
   }
 
   def remainingMem: Long = _remainingMem
@@ -712,8 +683,6 @@ private[spark] class BlockManagerInfo(
 
   def blocks: JHashMap[BlockId, BlockStatus] = _blocks
 
-  def exclusiveCachedBlocks: collection.Set[BlockId] = _exclusiveCachedBlocks
-
   override def toString: String = "BlockManagerInfo " + timeMs + " " + 
_remainingMem
 
   def clear() {
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
index 3dbac69..382501a 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
@@ -122,8 +122,5 @@ private[spark] object BlockManagerMessages {
 
   case class BlockManagerHeartbeat(blockManagerId: BlockManagerId) extends 
ToBlockManagerMaster
 
-  case class HasExclusiveCachedBlocks(executorId: String) extends 
ToBlockManagerMaster
-
   case class IsExecutorAlive(executorId: String) extends ToBlockManagerMaster
-
 }
diff --git 
a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
 
b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
index 1c78f1a..60054c8 100644
--- 
a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
+++ 
b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -1,4 +1,3 @@
 org.apache.spark.scheduler.DummyExternalClusterManager
 org.apache.spark.scheduler.MockExternalClusterManager
-org.apache.spark.DummyLocalExternalClusterManager
 org.apache.spark.scheduler.CSMockExternalClusterManager
diff --git 
a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 9c30124..2b75f2e 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -23,93 +23,95 @@ import org.mockito.ArgumentMatchers.{any, eq => meq}
 import org.mockito.Mockito.{mock, never, verify, when}
 import org.scalatest.PrivateMethodTester
 
-import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config.Tests.TEST_SCHEDULE_INTERVAL
+import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.ExternalClusterManager
 import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.local.LocalSchedulerBackend
-import org.apache.spark.storage.BlockManagerMaster
-import org.apache.spark.util.ManualClock
+import org.apache.spark.util.{Clock, ManualClock, SystemClock}
 
 /**
  * Test add and remove behavior of ExecutorAllocationManager.
  */
-class ExecutorAllocationManagerSuite
-  extends SparkFunSuite
-  with LocalSparkContext {
+class ExecutorAllocationManagerSuite extends SparkFunSuite {
 
   import ExecutorAllocationManager._
   import ExecutorAllocationManagerSuite._
 
-  private val contexts = new mutable.ListBuffer[SparkContext]()
+  private val managers = new mutable.ListBuffer[ExecutorAllocationManager]()
+  private var listenerBus: LiveListenerBus = _
+  private var client: ExecutorAllocationClient = _
 
   override def beforeEach(): Unit = {
     super.beforeEach()
-    contexts.clear()
+    managers.clear()
+    listenerBus = new LiveListenerBus(new SparkConf())
+    listenerBus.start(null, mock(classOf[MetricsSystem]))
+    client = mock(classOf[ExecutorAllocationClient])
+    when(client.isExecutorActive(any())).thenReturn(true)
   }
 
   override def afterEach(): Unit = {
     try {
-      contexts.foreach(_.stop())
+      listenerBus.stop()
+      managers.foreach(_.stop())
     } finally {
+      listenerBus = null
       super.afterEach()
     }
   }
 
-  private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = {
-    bus.post(event)
-    bus.waitUntilEmpty(1000)
+  private def post(event: SparkListenerEvent): Unit = {
+    listenerBus.post(event)
+    listenerBus.waitUntilEmpty(1000)
   }
 
-  test("verify min/max executors") {
-    val conf = new SparkConf()
-      .setMaster("myDummyLocalExternalClusterManager")
-      .setAppName("test-executor-allocation-manager")
-      .set(config.DYN_ALLOCATION_ENABLED, true)
-      .set(config.DYN_ALLOCATION_TESTING, true)
+  test("initialize dynamic allocation in SparkContext") {
+    val conf = createConf(0, 1, 0)
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName(getClass().getName())
+
     val sc0 = new SparkContext(conf)
-    contexts += sc0
-    assert(sc0.executorAllocationManager.isDefined)
-    sc0.stop()
+    try {
+      assert(sc0.executorAllocationManager.isDefined)
+    } finally {
+      sc0.stop()
+    }
+  }
 
+  test("verify min/max executors") {
     // Min < 0
-    val conf1 = conf.clone().set(config.DYN_ALLOCATION_MIN_EXECUTORS, -1)
-    intercept[SparkException] { contexts += new SparkContext(conf1) }
+    intercept[SparkException] {
+      createManager(createConf().set(config.DYN_ALLOCATION_MIN_EXECUTORS, -1))
+    }
 
     // Max < 0
-    val conf2 = conf.clone().set(config.DYN_ALLOCATION_MAX_EXECUTORS, -1)
-    intercept[SparkException] { contexts += new SparkContext(conf2) }
+    intercept[SparkException] {
+      createManager(createConf().set(config.DYN_ALLOCATION_MAX_EXECUTORS, -1))
+    }
 
     // Both min and max, but min > max
-    intercept[SparkException] { createSparkContext(2, 1) }
+    intercept[SparkException] {
+      createManager(createConf(2, 1))
+    }
 
     // Both min and max, and min == max
-    val sc1 = createSparkContext(1, 1)
-    assert(sc1.executorAllocationManager.isDefined)
-    sc1.stop()
+    createManager(createConf(1, 1))
 
     // Both min and max, and min < max
-    val sc2 = createSparkContext(1, 2)
-    assert(sc2.executorAllocationManager.isDefined)
-    sc2.stop()
+    createManager(createConf(1, 2))
   }
 
   test("starting state") {
-    sc = createSparkContext()
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf())
     assert(numExecutorsTarget(manager) === 1)
     assert(executorsPendingToRemove(manager).isEmpty)
-    assert(executorIds(manager).isEmpty)
     assert(addTime(manager) === ExecutorAllocationManager.NOT_SET)
-    assert(removeTimes(manager).isEmpty)
   }
 
   test("add executors") {
-    sc = createSparkContext(1, 10, 1)
-    val manager = sc.executorAllocationManager.get
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
+    val manager = createManager(createConf(1, 10, 1))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Keep adding until the limit is reached
     assert(numExecutorsTarget(manager) === 1)
@@ -152,24 +154,15 @@ class ExecutorAllocationManagerSuite
   }
 
   def testAllocationRatio(cores: Int, divisor: Double, expected: Int): Unit = {
-    val conf = new SparkConf()
-      .setMaster("myDummyLocalExternalClusterManager")
-      .setAppName("test-executor-allocation-manager")
-      .set(config.DYN_ALLOCATION_ENABLED, true)
-      .set(config.DYN_ALLOCATION_TESTING, true)
-      .set(config.DYN_ALLOCATION_MAX_EXECUTORS, 15)
-      .set(config.DYN_ALLOCATION_MIN_EXECUTORS, 3)
+    val conf = createConf(3, 15)
       .set(config.DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO, divisor)
       .set(config.EXECUTOR_CORES, cores)
-    val sc = new SparkContext(conf)
-    contexts += sc
-    var manager = sc.executorAllocationManager.get
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 20)))
+    val manager = createManager(conf)
+    post(SparkListenerStageSubmitted(createStageInfo(0, 20)))
     for (i <- 0 to 5) {
       addExecutors(manager)
     }
     assert(numExecutorsTarget(manager) === expected)
-    sc.stop()
   }
 
   test("executionAllocationRatio is correctly handled") {
@@ -185,9 +178,8 @@ class ExecutorAllocationManagerSuite
 
 
   test("add executors capped by num pending tasks") {
-    sc = createSparkContext(0, 10, 0)
-    val manager = sc.executorAllocationManager.get
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 5)))
+    val manager = createManager(createConf(0, 10, 0))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 5)))
 
     // Verify that we're capped at number of tasks in the stage
     assert(numExecutorsTarget(manager) === 0)
@@ -203,10 +195,10 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a task doesn't affect the target
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3)))
-    post(sc.listenerBus, SparkListenerExecutorAdded(
+    post(SparkListenerStageSubmitted(createStageInfo(1, 3)))
+    post(SparkListenerExecutorAdded(
       0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
-    post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 1)
     assert(numExecutorsTarget(manager) === 6)
@@ -219,9 +211,9 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that re-running a task doesn't blow things up
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 3)))
-    post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, 
"executor-1")))
-    post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, 
"executor-1")))
+    post(SparkListenerStageSubmitted(createStageInfo(2, 3)))
+    post(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1")))
+    post(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1")))
     assert(addExecutors(manager) === 1)
     assert(numExecutorsTarget(manager) === 9)
     assert(numExecutorsToAdd(manager) === 2)
@@ -230,23 +222,22 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a task once we're at our limit doesn't blow things 
up
-    post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, 
"executor-1")))
+    post(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1")))
     assert(addExecutors(manager) === 0)
     assert(numExecutorsTarget(manager) === 10)
   }
 
   test("add executors when speculative tasks added") {
-    sc = createSparkContext(0, 10, 0)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(0, 10, 0))
 
     // Verify that we're capped at number of tasks including the speculative 
ones in the stage
-    post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
+    post(SparkListenerSpeculativeTaskSubmitted(1))
     assert(numExecutorsTarget(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
     assert(addExecutors(manager) === 1)
-    post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
-    post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1))
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2)))
+    post(SparkListenerSpeculativeTaskSubmitted(1))
+    post(SparkListenerSpeculativeTaskSubmitted(1))
+    post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
     assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 2)
     assert(addExecutors(manager) === 2)
@@ -257,39 +248,37 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a task doesn't affect the target
-    post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-1")))
+    post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
 
     // Verify that running a speculative task doesn't affect the target
-    post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, 
"executor-2", true)))
+    post(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", 
true)))
     assert(numExecutorsTarget(manager) === 5)
     assert(addExecutors(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
   }
 
   test("ignore task end events from completed stages") {
-    sc = createSparkContext(0, 10, 0)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(0, 10, 0))
     val stage = createStageInfo(0, 5)
-    post(sc.listenerBus, SparkListenerStageSubmitted(stage))
+    post(SparkListenerStageSubmitted(stage))
     val taskInfo1 = createTaskInfo(0, 0, "executor-1")
     val taskInfo2 = createTaskInfo(1, 1, "executor-1")
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo2))
+    post(SparkListenerTaskStart(0, 0, taskInfo1))
+    post(SparkListenerTaskStart(0, 0, taskInfo2))
 
-    post(sc.listenerBus, SparkListenerStageCompleted(stage))
+    post(SparkListenerStageCompleted(stage))
 
-    post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, 
null))
-    post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, 
null))
+    post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null))
+    post(SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null))
     assert(totalRunningTasks(manager) === 0)
   }
 
   testRetry("cancel pending executors when no longer needed") {
-    sc = createSparkContext(0, 10, 0)
-    val manager = sc.executorAllocationManager.get
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5)))
+    val manager = createManager(createConf(0, 10, 0))
+    post(SparkListenerStageSubmitted(createStageInfo(2, 5)))
 
     assert(numExecutorsTarget(manager) === 0)
     assert(numExecutorsToAdd(manager) === 1)
@@ -300,26 +289,25 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 3)
 
     val task1Info = createTaskInfo(0, 0, "executor-1")
-    post(sc.listenerBus, SparkListenerTaskStart(2, 0, task1Info))
+    post(SparkListenerTaskStart(2, 0, task1Info))
 
     assert(numExecutorsToAdd(manager) === 4)
     assert(addExecutors(manager) === 2)
 
     val task2Info = createTaskInfo(1, 0, "executor-1")
-    post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info))
+    post(SparkListenerTaskStart(2, 0, task2Info))
 
     task1Info.markFinished(TaskState.FINISHED, System.currentTimeMillis())
-    post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, 
null))
+    post(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null))
 
     task2Info.markFinished(TaskState.FINISHED, System.currentTimeMillis())
-    post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, 
null))
+    post(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null))
 
     assert(adjustRequestedExecutors(manager) === -1)
   }
 
   test("remove executors") {
-    sc = createSparkContext(5, 10, 5)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(5, 10, 5))
     (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
 
     // Keep removing until the limit is reached
@@ -332,8 +320,6 @@ class ExecutorAllocationManagerSuite
     assert(executorsPendingToRemove(manager).size === 3)
     assert(executorsPendingToRemove(manager).contains("2"))
     assert(executorsPendingToRemove(manager).contains("3"))
-    assert(!removeExecutor(manager, "100")) // remove non-existent executors
-    assert(!removeExecutor(manager, "101"))
     assert(executorsPendingToRemove(manager).size === 3)
     assert(removeExecutor(manager, "4"))
     assert(removeExecutor(manager, "5"))
@@ -368,8 +354,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("remove multiple executors") {
-    sc = createSparkContext(5, 10, 5)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(5, 10, 5))
     (1 to 10).map(_.toString).foreach { id => onExecutorAdded(manager, id) }
 
     // Keep removing until the limit is reached
@@ -381,8 +366,6 @@ class ExecutorAllocationManagerSuite
     assert(executorsPendingToRemove(manager).size === 3)
     assert(executorsPendingToRemove(manager).contains("2"))
     assert(executorsPendingToRemove(manager).contains("3"))
-    assert(!removeExecutor(manager, "100")) // remove non-existent executors
-    assert(removeExecutors(manager, Seq("101", "102")) !== Seq("101", "102"))
     assert(executorsPendingToRemove(manager).size === 3)
     assert(removeExecutor(manager, "4"))
     assert(removeExecutors(manager, Seq("5")) === Seq("5"))
@@ -417,28 +400,27 @@ class ExecutorAllocationManagerSuite
   }
 
   test ("Removing with various numExecutorsTarget condition") {
-    sc = createSparkContext(5, 12, 5)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(5, 12, 5))
 
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 8)))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 8)))
 
     // Remove when numExecutorsTarget is the same as the current number of 
executors
     assert(addExecutors(manager) === 1)
     assert(addExecutors(manager) === 2)
     (1 to 8).foreach(execId => onExecutorAdded(manager, execId.toString))
     (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
-      info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) }
-    assert(executorIds(manager).size === 8)
+      info => post(SparkListenerTaskStart(0, 0, info)) }
+    assert(manager.executorMonitor.executorCount === 8)
     assert(numExecutorsTarget(manager) === 8)
     assert(maxNumExecutorsNeeded(manager) == 8)
     assert(!removeExecutor(manager, "1")) // won't work since 
numExecutorsTarget == numExecutors
 
     // Remove executors when numExecutorsTarget is lower than current number 
of executors
     (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { info =>
-      post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, info, 
null))
+      post(SparkListenerTaskEnd(0, 0, null, Success, info, null))
     }
     adjustRequestedExecutors(manager)
-    assert(executorIds(manager).size === 8)
+    assert(manager.executorMonitor.executorCount === 8)
     assert(numExecutorsTarget(manager) === 5)
     assert(maxNumExecutorsNeeded(manager) == 5)
     assert(removeExecutor(manager, "1"))
@@ -448,9 +430,8 @@ class ExecutorAllocationManagerSuite
     onExecutorRemoved(manager, "3")
 
     // numExecutorsTarget is lower than minNumExecutors
-    post(sc.listenerBus,
-      SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), 
null))
-    assert(executorIds(manager).size === 5)
+    post(SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), 
null))
+    assert(manager.executorMonitor.executorCount === 5)
     assert(numExecutorsTarget(manager) === 5)
     assert(maxNumExecutorsNeeded(manager) == 4)
     assert(!removeExecutor(manager, "4")) // lower limit
@@ -458,9 +439,8 @@ class ExecutorAllocationManagerSuite
   }
 
   test ("interleaving add and remove") {
-    sc = createSparkContext(5, 12, 5)
-    val manager = sc.executorAllocationManager.get
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
+    val manager = createManager(createConf(5, 12, 5))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Add a few executors
     assert(addExecutors(manager) === 1)
@@ -473,7 +453,7 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "6")
     onExecutorAdded(manager, "7")
     onExecutorAdded(manager, "8")
-    assert(executorIds(manager).size === 8)
+    assert(manager.executorMonitor.executorCount === 8)
     assert(numExecutorsTarget(manager) === 8)
 
 
@@ -486,7 +466,7 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "10")
     onExecutorAdded(manager, "11")
     onExecutorAdded(manager, "12")
-    assert(executorIds(manager).size === 12)
+    assert(manager.executorMonitor.executorCount === 12)
     assert(numExecutorsTarget(manager) === 8)
 
     assert(removeExecutor(manager, "1"))
@@ -497,7 +477,7 @@ class ExecutorAllocationManagerSuite
     onExecutorRemoved(manager, "2")
     onExecutorRemoved(manager, "3")
     onExecutorRemoved(manager, "4")
-    assert(executorIds(manager).size === 8)
+    assert(manager.executorMonitor.executorCount === 8)
 
     // Add until limit
     assert(!removeExecutor(manager, "7")) // still at lower limit
@@ -506,34 +486,32 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "14")
     onExecutorAdded(manager, "15")
     onExecutorAdded(manager, "16")
-    assert(executorIds(manager).size === 12)
+    assert(manager.executorMonitor.executorCount === 12)
 
     // Remove succeeds again, now that we are no longer at the lower limit
     assert(removeExecutors(manager, Seq("5", "6", "7")) === Seq("5", "6", "7"))
     assert(removeExecutor(manager, "8"))
-    assert(executorIds(manager).size === 12)
+    assert(manager.executorMonitor.executorCount === 12)
     onExecutorRemoved(manager, "5")
     onExecutorRemoved(manager, "6")
-    assert(executorIds(manager).size === 10)
+    assert(manager.executorMonitor.executorCount === 10)
     assert(numExecutorsToAdd(manager) === 4)
     onExecutorRemoved(manager, "9")
     onExecutorRemoved(manager, "10")
     assert(addExecutors(manager) === 4) // at upper limit
     onExecutorAdded(manager, "17")
     onExecutorAdded(manager, "18")
-    assert(executorIds(manager).size === 10)
+    assert(manager.executorMonitor.executorCount === 10)
     assert(addExecutors(manager) === 0) // still at upper limit
     onExecutorAdded(manager, "19")
     onExecutorAdded(manager, "20")
-    assert(executorIds(manager).size === 12)
+    assert(manager.executorMonitor.executorCount === 12)
     assert(numExecutorsTarget(manager) === 12)
   }
 
   test("starting/canceling add timer") {
-    sc = createSparkContext(2, 10, 2)
     val clock = new ManualClock(8888L)
-    val manager = sc.executorAllocationManager.get
-    manager.setClock(clock)
+    val manager = createManager(createConf(2, 10, 2), clock = clock)
 
     // Starting add timer is idempotent
     assert(addTime(manager) === NOT_SET)
@@ -561,58 +539,9 @@ class ExecutorAllocationManagerSuite
     assert(firstAddTime !== secondAddTime)
   }
 
-  test("starting/canceling remove timers") {
-    sc = createSparkContext(2, 10, 2)
-    val clock = new ManualClock(14444L)
-    val manager = sc.executorAllocationManager.get
-    manager.setClock(clock)
-
-    executorIds(manager).asInstanceOf[mutable.Set[String]] ++= List("1", "2", 
"3")
-
-    // Starting remove timer is idempotent for each executor
-    assert(removeTimes(manager).isEmpty)
-    onExecutorIdle(manager, "1")
-    assert(removeTimes(manager).size === 1)
-    assert(removeTimes(manager).contains("1"))
-    val firstRemoveTime = removeTimes(manager)("1")
-    assert(firstRemoveTime === clock.getTimeMillis + executorIdleTimeout * 
1000)
-    clock.advance(100L)
-    onExecutorIdle(manager, "1")
-    assert(removeTimes(manager)("1") === firstRemoveTime) // timer is already 
started
-    clock.advance(200L)
-    onExecutorIdle(manager, "1")
-    assert(removeTimes(manager)("1") === firstRemoveTime)
-    clock.advance(300L)
-    onExecutorIdle(manager, "2")
-    assert(removeTimes(manager)("2") !== firstRemoveTime) // different executor
-    assert(removeTimes(manager)("2") === clock.getTimeMillis + 
executorIdleTimeout * 1000)
-    clock.advance(400L)
-    onExecutorIdle(manager, "3")
-    assert(removeTimes(manager)("3") !== firstRemoveTime)
-    assert(removeTimes(manager)("3") === clock.getTimeMillis + 
executorIdleTimeout * 1000)
-    assert(removeTimes(manager).size === 3)
-    assert(removeTimes(manager).contains("2"))
-    assert(removeTimes(manager).contains("3"))
-
-    // Restart remove timer
-    clock.advance(1000L)
-    onExecutorBusy(manager, "1")
-    assert(removeTimes(manager).size === 2)
-    onExecutorIdle(manager, "1")
-    assert(removeTimes(manager).size === 3)
-    assert(removeTimes(manager).contains("1"))
-    val secondRemoveTime = removeTimes(manager)("1")
-    assert(secondRemoveTime === clock.getTimeMillis + executorIdleTimeout * 
1000)
-    assert(removeTimes(manager)("1") === secondRemoveTime) // timer is already 
started
-    assert(removeTimes(manager)("1") !== firstRemoveTime)
-    assert(firstRemoveTime !== secondRemoveTime)
-  }
-
   test("mock polling loop with no events") {
-    sc = createSparkContext(0, 20, 0)
-    val manager = sc.executorAllocationManager.get
     val clock = new ManualClock(2020L)
-    manager.setClock(clock)
+    val manager = createManager(createConf(0, 20, 0), clock = clock)
 
     // No events - we should not be adding or removing
     assert(numExecutorsTarget(manager) === 0)
@@ -635,11 +564,9 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop add behavior") {
-    sc = createSparkContext(0, 20, 0)
     val clock = new ManualClock(2020L)
-    val manager = sc.executorAllocationManager.get
-    manager.setClock(clock)
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000)))
+    val manager = createManager(createConf(0, 20, 0), clock = clock)
+    post(SparkListenerStageSubmitted(createStageInfo(0, 1000)))
 
     // Scheduler queue backlogged
     onSchedulerBacklogged(manager)
@@ -685,24 +612,25 @@ class ExecutorAllocationManagerSuite
   }
 
   test("mock polling loop remove behavior") {
-    sc = createSparkContext(1, 20, 1)
     val clock = new ManualClock(2020L)
-    val manager = sc.executorAllocationManager.get
-    manager.setClock(clock)
+    val manager = createManager(createConf(1, 20, 1), clock = clock)
 
     // Remove idle executors on timeout
     onExecutorAdded(manager, "executor-1")
     onExecutorAdded(manager, "executor-2")
     onExecutorAdded(manager, "executor-3")
-    assert(removeTimes(manager).size === 3)
     assert(executorsPendingToRemove(manager).isEmpty)
+
+    // idle threshold not reached yet
     clock.advance(executorIdleTimeout * 1000 / 2)
     schedule(manager)
-    assert(removeTimes(manager).size === 3) // idle threshold not reached yet
+    assert(manager.executorMonitor.timedOutExecutors().isEmpty)
     assert(executorsPendingToRemove(manager).isEmpty)
+
+    // idle threshold exceeded
     clock.advance(executorIdleTimeout * 1000)
+    assert(manager.executorMonitor.timedOutExecutors().size === 3)
     schedule(manager)
-    assert(removeTimes(manager).isEmpty) // idle threshold exceeded
     assert(executorsPendingToRemove(manager).size === 2) // limit reached (1 
executor remaining)
 
     // Mark a subset as busy - only idle executors should be removed
@@ -710,20 +638,20 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "executor-5")
     onExecutorAdded(manager, "executor-6")
     onExecutorAdded(manager, "executor-7")
-    assert(removeTimes(manager).size === 5)              // 5 active executors
+    assert(manager.executorMonitor.executorCount === 7)
     assert(executorsPendingToRemove(manager).size === 2) // 2 pending to be 
removed
     onExecutorBusy(manager, "executor-4")
     onExecutorBusy(manager, "executor-5")
     onExecutorBusy(manager, "executor-6") // 3 busy and 2 idle (of the 5 
active ones)
+
+    // after scheduling, the previously timed out executor should be removed, 
since
+    // there are new active ones.
     schedule(manager)
-    assert(removeTimes(manager).size === 2) // remove only idle executors
-    assert(!removeTimes(manager).contains("executor-4"))
-    assert(!removeTimes(manager).contains("executor-5"))
-    assert(!removeTimes(manager).contains("executor-6"))
-    assert(executorsPendingToRemove(manager).size === 2)
+    assert(executorsPendingToRemove(manager).size === 3)
+
+    // advance the clock so that idle executors should time out and move to 
the pending list
     clock.advance(executorIdleTimeout * 1000)
     schedule(manager)
-    assert(removeTimes(manager).isEmpty) // idle executors are removed
     assert(executorsPendingToRemove(manager).size === 4)
     assert(!executorsPendingToRemove(manager).contains("executor-4"))
     assert(!executorsPendingToRemove(manager).contains("executor-5"))
@@ -734,152 +662,45 @@ class ExecutorAllocationManagerSuite
     onExecutorIdle(manager, "executor-5")
     onExecutorIdle(manager, "executor-6")
     schedule(manager)
-    assert(removeTimes(manager).size === 3) // 0 busy and 3 idle
-    assert(removeTimes(manager).contains("executor-4"))
-    assert(removeTimes(manager).contains("executor-5"))
-    assert(removeTimes(manager).contains("executor-6"))
     assert(executorsPendingToRemove(manager).size === 4)
     clock.advance(executorIdleTimeout * 1000)
     schedule(manager)
-    assert(removeTimes(manager).isEmpty)
     assert(executorsPendingToRemove(manager).size === 6) // limit reached (1 
executor remaining)
   }
 
   test("listeners trigger add executors correctly") {
-    sc = createSparkContext(2, 10, 2)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(1, 20, 1))
     assert(addTime(manager) === NOT_SET)
 
     // Starting a stage should start the add timer
     val numTasks = 10
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 
numTasks)))
+    post(SparkListenerStageSubmitted(createStageInfo(0, numTasks)))
     assert(addTime(manager) !== NOT_SET)
 
     // Starting a subset of the tasks should not cancel the add timer
     val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, 
"executor-1") }
-    taskInfos.tail.foreach { info => post(sc.listenerBus, 
SparkListenerTaskStart(0, 0, info)) }
+    taskInfos.tail.foreach { info => post(SparkListenerTaskStart(0, 0, info)) }
     assert(addTime(manager) !== NOT_SET)
 
     // Starting all remaining tasks should cancel the add timer
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfos.head))
+    post(SparkListenerTaskStart(0, 0, taskInfos.head))
     assert(addTime(manager) === NOT_SET)
 
     // Start two different stages
     // The add timer should be canceled only if all tasks in both stages start 
running
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 
numTasks)))
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 
numTasks)))
+    post(SparkListenerStageSubmitted(createStageInfo(1, numTasks)))
+    post(SparkListenerStageSubmitted(createStageInfo(2, numTasks)))
     assert(addTime(manager) !== NOT_SET)
-    taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(1, 
0, info)) }
+    taskInfos.foreach { info => post(SparkListenerTaskStart(1, 0, info)) }
     assert(addTime(manager) !== NOT_SET)
-    taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(2, 
0, info)) }
+    taskInfos.foreach { info => post(SparkListenerTaskStart(2, 0, info)) }
     assert(addTime(manager) === NOT_SET)
   }
 
-  test("listeners trigger remove executors correctly") {
-    sc = createSparkContext(2, 10, 2)
-    val manager = sc.executorAllocationManager.get
-    assert(removeTimes(manager).isEmpty)
-
-    // Added executors should start the remove timers for each executor
-    (1 to 5).map("executor-" + _).foreach { id => onExecutorAdded(manager, id) 
}
-    assert(removeTimes(manager).size === 5)
-
-    // Starting a task cancel the remove timer for that executor
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, 
"executor-1")))
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, 
"executor-2")))
-    assert(removeTimes(manager).size === 3)
-    assert(!removeTimes(manager).contains("executor-1"))
-    assert(!removeTimes(manager).contains("executor-2"))
-
-    // Finishing all tasks running on an executor should start the remove 
timer for that executor
-    post(sc.listenerBus, SparkListenerTaskEnd(
-      0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new 
TaskMetrics))
-    post(sc.listenerBus, SparkListenerTaskEnd(
-      0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new 
TaskMetrics))
-    assert(removeTimes(manager).size === 4)
-    assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not 
finished yet
-    assert(removeTimes(manager).contains("executor-2"))
-    post(sc.listenerBus, SparkListenerTaskEnd(
-      0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new 
TaskMetrics))
-    assert(removeTimes(manager).size === 5)
-    assert(removeTimes(manager).contains("executor-1")) // executor-1 has now 
finished
-  }
-
-  test("listeners trigger add and remove executor callbacks correctly") {
-    sc = createSparkContext(2, 10, 2)
-    val manager = sc.executorAllocationManager.get
-    assert(executorIds(manager).isEmpty)
-    assert(removeTimes(manager).isEmpty)
-
-    // New executors have registered
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
-    assert(executorIds(manager).size === 1)
-    assert(executorIds(manager).contains("executor-1"))
-    assert(removeTimes(manager).size === 1)
-    assert(removeTimes(manager).contains("executor-1"))
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty, Map.empty)))
-    assert(executorIds(manager).size === 2)
-    assert(executorIds(manager).contains("executor-2"))
-    assert(removeTimes(manager).size === 2)
-    assert(removeTimes(manager).contains("executor-2"))
-
-    // Existing executors have disconnected
-    post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-1", ""))
-    assert(executorIds(manager).size === 1)
-    assert(!executorIds(manager).contains("executor-1"))
-    assert(removeTimes(manager).size === 1)
-    assert(!removeTimes(manager).contains("executor-1"))
-
-    // Unknown executor has disconnected
-    post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-3", ""))
-    assert(executorIds(manager).size === 1)
-    assert(removeTimes(manager).size === 1)
-  }
-
-  test("SPARK-4951: call onTaskStart before onExecutorAdded") {
-    sc = createSparkContext(2, 10, 2)
-    val manager = sc.executorAllocationManager.get
-    assert(executorIds(manager).isEmpty)
-    assert(removeTimes(manager).isEmpty)
-
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
-    assert(executorIds(manager).size === 1)
-    assert(executorIds(manager).contains("executor-1"))
-    assert(removeTimes(manager).size === 0)
-  }
-
-  test("SPARK-4951: onExecutorAdded should not add a busy executor to 
removeTimes") {
-    sc = createSparkContext(2, 10)
-    val manager = sc.executorAllocationManager.get
-    assert(executorIds(manager).isEmpty)
-    assert(removeTimes(manager).isEmpty)
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, 
"executor-1")))
-
-    assert(executorIds(manager).size === 1)
-    assert(executorIds(manager).contains("executor-1"))
-    assert(removeTimes(manager).size === 0)
-
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
-    assert(executorIds(manager).size === 2)
-    assert(executorIds(manager).contains("executor-2"))
-    assert(removeTimes(manager).size === 1)
-    assert(removeTimes(manager).contains("executor-2"))
-    assert(!removeTimes(manager).contains("executor-1"))
-  }
-
   test("avoid ramp up when target < running executors") {
-    sc = createSparkContext(0, 100000, 0)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(0, 100000, 0))
     val stage1 = createStageInfo(0, 1000)
-    post(sc.listenerBus, SparkListenerStageSubmitted(stage1))
+    post(SparkListenerStageSubmitted(stage1))
 
     assert(addExecutors(manager) === 1)
     assert(addExecutors(manager) === 2)
@@ -889,22 +710,20 @@ class ExecutorAllocationManagerSuite
     (0 until 15).foreach { i =>
       onExecutorAdded(manager, s"executor-$i")
     }
-    assert(executorIds(manager).size === 15)
-    post(sc.listenerBus, SparkListenerStageCompleted(stage1))
+    assert(manager.executorMonitor.executorCount === 15)
+    post(SparkListenerStageCompleted(stage1))
 
     adjustRequestedExecutors(manager)
     assert(numExecutorsTarget(manager) === 0)
 
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 1000)))
+    post(SparkListenerStageSubmitted(createStageInfo(1, 1000)))
     addExecutors(manager)
     assert(numExecutorsTarget(manager) === 16)
   }
 
   test("avoid ramp down initial executors until first job is submitted") {
-    sc = createSparkContext(2, 5, 3)
-    val manager = sc.executorAllocationManager.get
     val clock = new ManualClock(10000L)
-    manager.setClock(clock)
+    val manager = createManager(createConf(2, 5, 3), clock = clock)
 
     // Verify the initial number of executors
     assert(numExecutorsTarget(manager) === 3)
@@ -912,7 +731,7 @@ class ExecutorAllocationManagerSuite
     // Verify whether the initial number of executors is kept with no pending 
tasks
     assert(numExecutorsTarget(manager) === 3)
 
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2)))
+    post(SparkListenerStageSubmitted(createStageInfo(1, 2)))
     clock.advance(100L)
 
     assert(maxNumExecutorsNeeded(manager) === 2)
@@ -923,10 +742,8 @@ class ExecutorAllocationManagerSuite
   }
 
   test("avoid ramp down initial executors until idle executor is timeout") {
-    sc = createSparkContext(2, 5, 3)
-    val manager = sc.executorAllocationManager.get
     val clock = new ManualClock(10000L)
-    manager.setClock(clock)
+    val manager = createManager(createConf(2, 5, 3), clock = clock)
 
     // Verify the initial number of executors
     assert(numExecutorsTarget(manager) === 3)
@@ -946,8 +763,7 @@ class ExecutorAllocationManagerSuite
   }
 
   test("get pending task number and related locality preference") {
-    sc = createSparkContext(2, 5, 3)
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf(2, 5, 3))
 
     val localityPreferences1 = Seq(
       Seq(TaskLocation("host1"), TaskLocation("host2"), TaskLocation("host3")),
@@ -957,7 +773,7 @@ class ExecutorAllocationManagerSuite
       Seq.empty
     )
     val stageInfo1 = createStageInfo(1, 5, localityPreferences1)
-    post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo1))
+    post(SparkListenerStageSubmitted(stageInfo1))
 
     assert(localityAwareTasks(manager) === 3)
     assert(hostToLocalTaskCount(manager) ===
@@ -969,45 +785,43 @@ class ExecutorAllocationManagerSuite
       Seq.empty
     )
     val stageInfo2 = createStageInfo(2, 3, localityPreferences2)
-    post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo2))
+    post(SparkListenerStageSubmitted(stageInfo2))
 
     assert(localityAwareTasks(manager) === 5)
     assert(hostToLocalTaskCount(manager) ===
       Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 
2))
 
-    post(sc.listenerBus, SparkListenerStageCompleted(stageInfo1))
+    post(SparkListenerStageCompleted(stageInfo1))
     assert(localityAwareTasks(manager) === 2)
     assert(hostToLocalTaskCount(manager) ===
       Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2))
   }
 
   test("SPARK-8366: maxNumExecutorsNeeded should properly handle failed 
tasks") {
-    sc = createSparkContext()
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf())
     assert(maxNumExecutorsNeeded(manager) === 0)
 
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1)))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     assert(maxNumExecutorsNeeded(manager) === 1)
 
     val taskInfo = createTaskInfo(1, 1, "executor-1")
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo))
+    post(SparkListenerTaskStart(0, 0, taskInfo))
     assert(maxNumExecutorsNeeded(manager) === 1)
 
     // If the task is failed, we expect it to be resubmitted later.
     val taskEndReason = ExceptionFailure(null, null, null, null, None)
-    post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, taskEndReason, 
taskInfo, null))
+    post(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null))
     assert(maxNumExecutorsNeeded(manager) === 1)
   }
 
   test("reset the state of allocation manager") {
-    sc = createSparkContext()
-    val manager = sc.executorAllocationManager.get
+    val manager = createManager(createConf())
     assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 1)
 
     // Allocation manager is reset when adding executor requests are sent 
without reporting back
     // executor added.
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10)))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 10)))
 
     assert(addExecutors(manager) === 1)
     assert(numExecutorsTarget(manager) === 2)
@@ -1019,10 +833,10 @@ class ExecutorAllocationManagerSuite
     manager.reset()
     assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 1)
-    assert(executorIds(manager) === Set.empty)
+    assert(manager.executorMonitor.executorCount === 0)
 
     // Allocation manager is reset when executors are added.
-    post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10)))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 10)))
 
     addExecutors(manager)
     addExecutors(manager)
@@ -1034,7 +848,7 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "third")
     onExecutorAdded(manager, "fourth")
     onExecutorAdded(manager, "fifth")
-    assert(executorIds(manager) === Set("first", "second", "third", "fourth", 
"fifth"))
+    assert(manager.executorMonitor.executorCount === 5)
 
     // Cluster manager lost will make all the live executors lost, so here 
simulate this behavior
     onExecutorRemoved(manager, "first")
@@ -1046,8 +860,7 @@ class ExecutorAllocationManagerSuite
     manager.reset()
     assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 1)
-    assert(executorIds(manager) === Set.empty)
-    assert(removeTimes(manager) === Map.empty)
+    assert(manager.executorMonitor.executorCount === 0)
 
     // Allocation manager is reset when executors are pending to remove
     addExecutors(manager)
@@ -1063,14 +876,12 @@ class ExecutorAllocationManagerSuite
     onExecutorAdded(manager, "sixth")
     onExecutorAdded(manager, "seventh")
     onExecutorAdded(manager, "eighth")
-    assert(executorIds(manager) === Set("first", "second", "third", "fourth", 
"fifth",
-      "sixth", "seventh", "eighth"))
+    assert(manager.executorMonitor.executorCount === 8)
 
     removeExecutor(manager, "first")
     removeExecutors(manager, Seq("second", "third"))
     assert(executorsPendingToRemove(manager) === Set("first", "second", 
"third"))
-    assert(executorIds(manager) === Set("first", "second", "third", "fourth", 
"fifth",
-      "sixth", "seventh", "eighth"))
+    assert(manager.executorMonitor.executorCount === 8)
 
 
     // Cluster manager lost will make all the live executors lost, so here 
simulate this behavior
@@ -1085,80 +896,64 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 1)
     assert(numExecutorsToAdd(manager) === 1)
     assert(executorsPendingToRemove(manager) === Set.empty)
-    assert(removeTimes(manager) === Map.empty)
+    assert(manager.executorMonitor.executorCount === 0)
   }
 
   test("SPARK-23365 Don't update target num executors when killing idle 
executors") {
-    val minExecutors = 1
-    val initialExecutors = 1
-    val maxExecutors = 2
-    val conf = new SparkConf()
-      .set(config.DYN_ALLOCATION_ENABLED, true)
-      .set(config.SHUFFLE_SERVICE_ENABLED, true)
-      .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
-      .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors)
-      .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS, initialExecutors)
-      .set(config.DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT.key, "1000ms")
-      .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key, 
"1000ms")
-      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "3000ms")
-    val mockAllocationClient = mock(classOf[ExecutorAllocationClient])
-    val mockBMM = mock(classOf[BlockManagerMaster])
-    val manager = new ExecutorAllocationManager(
-      mockAllocationClient, mock(classOf[LiveListenerBus]), conf, mockBMM)
     val clock = new ManualClock()
-    manager.setClock(clock)
+    val manager = createManager(
+      createConf(1, 2, 1).set(config.DYN_ALLOCATION_TESTING, false),
+      clock = clock)
 
-    when(mockAllocationClient.requestTotalExecutors(meq(2), any(), 
any())).thenReturn(true)
+    when(client.requestTotalExecutors(meq(2), any(), any())).thenReturn(true)
     // test setup -- job with 2 tasks, scale up to two executors
     assert(numExecutorsTarget(manager) === 1)
-    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+    post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, 
Map.empty, Map.empty)))
-    
manager.listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0,
 2)))
+    post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
     clock.advance(1000)
     manager invokePrivate 
_updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
     assert(numExecutorsTarget(manager) === 2)
     val taskInfo0 = createTaskInfo(0, 0, "executor-1")
-    manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo0))
-    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+    post(SparkListenerTaskStart(0, 0, taskInfo0))
+    post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 1, 
Map.empty, Map.empty)))
     val taskInfo1 = createTaskInfo(1, 1, "executor-2")
-    manager.listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo1))
+    post(SparkListenerTaskStart(0, 0, taskInfo1))
     assert(numExecutorsTarget(manager) === 2)
 
     // have one task finish -- we should adjust the target number of executors 
down
     // but we should *not* kill any executors yet
-    manager.listener.onTaskEnd(SparkListenerTaskEnd(0, 0, null, Success, 
taskInfo0, null))
+    post(SparkListenerTaskEnd(0, 0, null, Success, taskInfo0, null))
     assert(maxNumExecutorsNeeded(manager) === 1)
     assert(numExecutorsTarget(manager) === 2)
     clock.advance(1000)
     manager invokePrivate 
_updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
     assert(numExecutorsTarget(manager) === 1)
-    verify(mockAllocationClient, never).killExecutors(any(), any(), any(), 
any())
+    verify(client, never).killExecutors(any(), any(), any(), any())
 
     // now we cross the idle timeout for executor-1, so we kill it.  the 
really important
     // thing here is that we do *not* ask the executor allocation client to 
adjust the target
     // number of executors down
-    when(mockAllocationClient.killExecutors(Seq("executor-1"), false, false, 
false))
+    when(client.killExecutors(Seq("executor-1"), false, false, false))
       .thenReturn(Seq("executor-1"))
     clock.advance(3000)
     schedule(manager)
     assert(maxNumExecutorsNeeded(manager) === 1)
     assert(numExecutorsTarget(manager) === 1)
     // here's the important verify -- we did kill the executors, but did not 
adjust the target count
-    verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, 
false, false)
+    verify(client).killExecutors(Seq("executor-1"), false, false, false)
   }
 
   test("SPARK-26758 check executor target number after idle time out ") {
-    sc = createSparkContext(1, 5, 3)
-    val manager = sc.executorAllocationManager.get
     val clock = new ManualClock(10000L)
-    manager.setClock(clock)
+    val manager = createManager(createConf(1, 5, 3), clock = clock)
     assert(numExecutorsTarget(manager) === 3)
-    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+    post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, 
Map.empty)))
-    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+    post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, 
Map.empty)))
-    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+    post(SparkListenerExecutorAdded(
       clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, 
Map.empty)))
     // make all the executors as idle, so that it will be killed
     clock.advance(executorIdleTimeout * 1000)
@@ -1167,36 +962,11 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 1)
   }
 
-  test("SPARK-26927 call onExecutorRemoved before onTaskStart") {
-    sc = createSparkContext(2, 5)
-    val manager = sc.executorAllocationManager.get
-    assert(executorIds(manager).isEmpty)
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "2", new ExecutorInfo("host2", 1, Map.empty, Map.empty)))
-    post(sc.listenerBus, SparkListenerExecutorAdded(
-      0L, "3", new ExecutorInfo("host3", 1, Map.empty, Map.empty)))
-    assert(executorIds(manager).size === 3)
-
-    post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "3", "disconnected"))
-    assert(executorIds(manager).size === 2)
-    assert(executorIds(manager) === Set("1", "2"))
-
-    val taskInfo1 = createTaskInfo(0, 0, "3")
-    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
-    // Verify taskStart not adding already removed executors.
-    assert(executorIds(manager).size === 2)
-    assert(executorIds(manager) === Set("1", "2"))
-  }
-
-  private def createSparkContext(
+  private def createConf(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,
-      initialExecutors: Int = 1): SparkContext = {
-    val conf = new SparkConf()
-      .setMaster("myDummyLocalExternalClusterManager")
-      .setAppName("test-executor-allocation-manager")
+      initialExecutors: Int = 1): SparkConf = {
+    new SparkConf()
       .set(config.DYN_ALLOCATION_ENABLED, true)
       .set(config.DYN_ALLOCATION_MIN_EXECUTORS, minExecutors)
       .set(config.DYN_ALLOCATION_MAX_EXECUTORS, maxExecutors)
@@ -1206,15 +976,49 @@ class ExecutorAllocationManagerSuite
       .set(config.DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key,
         s"${sustainedSchedulerBacklogTimeout.toString}s")
       .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, 
s"${executorIdleTimeout.toString}s")
+      .set(config.SHUFFLE_SERVICE_ENABLED, true)
       .set(config.DYN_ALLOCATION_TESTING, true)
       // SPARK-22864: effectively disable the allocation schedule by setting 
the period to a
       // really long value.
       .set(TEST_SCHEDULE_INTERVAL, 10000L)
-    val sc = new SparkContext(conf)
-    contexts += sc
-    sc
   }
 
+  private def createManager(
+      conf: SparkConf,
+      clock: Clock = new SystemClock()): ExecutorAllocationManager = {
+    val manager = new ExecutorAllocationManager(client, listenerBus, conf, 
clock)
+    managers += manager
+    manager.start()
+    manager
+  }
+
+  private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): 
Unit = {
+    post(SparkListenerExecutorAdded(0L, id, null))
+  }
+
+  private def onExecutorRemoved(manager: ExecutorAllocationManager, id: 
String): Unit = {
+    post(SparkListenerExecutorRemoved(0L, id, null))
+  }
+
+  private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): 
Unit = {
+    val info = new TaskInfo(1, 1, 1, 0, id, "foo.example.com", 
TaskLocality.PROCESS_LOCAL, false)
+    post(SparkListenerTaskStart(1, 1, info))
+  }
+
+  private def onExecutorIdle(manager: ExecutorAllocationManager, id: String): 
Unit = {
+    val info = new TaskInfo(1, 1, 1, 0, id, "foo.example.com", 
TaskLocality.PROCESS_LOCAL, false)
+    info.markFinished(TaskState.FINISHED, 1)
+    post(SparkListenerTaskEnd(1, 1, "foo", Success, info, null))
+  }
+
+  private def removeExecutor(manager: ExecutorAllocationManager, executorId: 
String): Boolean = {
+    val executorsRemoved = removeExecutors(manager, Seq(executorId))
+    executorsRemoved.nonEmpty && executorsRemoved(0) == executorId
+  }
+
+  private def executorsPendingToRemove(manager: ExecutorAllocationManager): 
Set[String] = {
+    manager.executorMonitor.executorsPendingToRemove()
+  }
 }
 
 /**
@@ -1243,7 +1047,6 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
     new TaskInfo(taskId, taskIndex, 0, 0, executorId, "", TaskLocality.ANY, 
speculative)
   }
 
-
   /* ------------------------------------------------------- *
    | Helper methods for accessing private methods and fields |
    * ------------------------------------------------------- */
@@ -1251,23 +1054,14 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
   private val _numExecutorsToAdd = PrivateMethod[Int]('numExecutorsToAdd)
   private val _numExecutorsTarget = PrivateMethod[Int]('numExecutorsTarget)
   private val _maxNumExecutorsNeeded = 
PrivateMethod[Int]('maxNumExecutorsNeeded)
-  private val _executorsPendingToRemove =
-    PrivateMethod[collection.Set[String]]('executorsPendingToRemove)
-  private val _executorIds = 
PrivateMethod[collection.Set[String]]('executorIds)
   private val _addTime = PrivateMethod[Long]('addTime)
-  private val _removeTimes = PrivateMethod[collection.Map[String, 
Long]]('removeTimes)
   private val _schedule = PrivateMethod[Unit]('schedule)
   private val _addExecutors = PrivateMethod[Int]('addExecutors)
   private val _updateAndSyncNumExecutorsTarget =
     PrivateMethod[Int]('updateAndSyncNumExecutorsTarget)
-  private val _removeExecutor = PrivateMethod[Boolean]('removeExecutor)
   private val _removeExecutors = PrivateMethod[Seq[String]]('removeExecutors)
-  private val _onExecutorAdded = PrivateMethod[Unit]('onExecutorAdded)
-  private val _onExecutorRemoved = PrivateMethod[Unit]('onExecutorRemoved)
   private val _onSchedulerBacklogged = 
PrivateMethod[Unit]('onSchedulerBacklogged)
   private val _onSchedulerQueueEmpty = 
PrivateMethod[Unit]('onSchedulerQueueEmpty)
-  private val _onExecutorIdle = PrivateMethod[Unit]('onExecutorIdle)
-  private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
   private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
   private val _hostToLocalTaskCount = PrivateMethod[Map[String, 
Int]]('hostToLocalTaskCount)
   private val _onSpeculativeTaskSubmitted = 
PrivateMethod[Unit]('onSpeculativeTaskSubmitted)
@@ -1281,23 +1075,10 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
     manager invokePrivate _numExecutorsTarget()
   }
 
-  private def executorsPendingToRemove(
-      manager: ExecutorAllocationManager): collection.Set[String] = {
-    manager invokePrivate _executorsPendingToRemove()
-  }
-
-  private def executorIds(manager: ExecutorAllocationManager): 
collection.Set[String] = {
-    manager invokePrivate _executorIds()
-  }
-
   private def addTime(manager: ExecutorAllocationManager): Long = {
     manager invokePrivate _addTime()
   }
 
-  private def removeTimes(manager: ExecutorAllocationManager): 
collection.Map[String, Long] = {
-    manager invokePrivate _removeTimes()
-  }
-
   private def schedule(manager: ExecutorAllocationManager): Unit = {
     manager invokePrivate _schedule()
   }
@@ -1315,22 +1096,10 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
     manager invokePrivate _updateAndSyncNumExecutorsTarget(0L)
   }
 
-  private def removeExecutor(manager: ExecutorAllocationManager, id: String): 
Boolean = {
-    manager invokePrivate _removeExecutor(id)
-  }
-
   private def removeExecutors(manager: ExecutorAllocationManager, ids: 
Seq[String]): Seq[String] = {
     manager invokePrivate _removeExecutors(ids)
   }
 
-  private def onExecutorAdded(manager: ExecutorAllocationManager, id: String): 
Unit = {
-    manager invokePrivate _onExecutorAdded(id)
-  }
-
-  private def onExecutorRemoved(manager: ExecutorAllocationManager, id: 
String): Unit = {
-    manager invokePrivate _onExecutorRemoved(id)
-  }
-
   private def onSchedulerBacklogged(manager: ExecutorAllocationManager): Unit 
= {
     manager invokePrivate _onSchedulerBacklogged()
   }
@@ -1339,14 +1108,6 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
     manager invokePrivate _onSchedulerQueueEmpty()
   }
 
-  private def onExecutorIdle(manager: ExecutorAllocationManager, id: String): 
Unit = {
-    manager invokePrivate _onExecutorIdle(id)
-  }
-
-  private def onExecutorBusy(manager: ExecutorAllocationManager, id: String): 
Unit = {
-    manager invokePrivate _onExecutorBusy(id)
-  }
-
   private def onSpeculativeTaskSubmitted(manager: ExecutorAllocationManager, 
id: String) : Unit = {
     manager invokePrivate _onSpeculativeTaskSubmitted(id)
   }
@@ -1363,66 +1124,3 @@ private object ExecutorAllocationManagerSuite extends 
PrivateMethodTester {
     manager invokePrivate _hostToLocalTaskCount()
   }
 }
-
-/**
- * A cluster manager which wraps around the scheduler and backend for local 
mode. It is used for
- * testing the dynamic allocation policy.
- */
-private class DummyLocalExternalClusterManager extends ExternalClusterManager {
-
-  def canCreate(masterURL: String): Boolean = masterURL == 
"myDummyLocalExternalClusterManager"
-
-  override def createTaskScheduler(
-      sc: SparkContext,
-      masterURL: String): TaskScheduler = new TaskSchedulerImpl(sc, 1, isLocal 
= true)
-
-  override def createSchedulerBackend(
-      sc: SparkContext,
-      masterURL: String,
-      scheduler: TaskScheduler): SchedulerBackend = {
-    val sb = new LocalSchedulerBackend(sc.getConf, 
scheduler.asInstanceOf[TaskSchedulerImpl], 1)
-    new DummyLocalSchedulerBackend(sc, sb)
-  }
-
-  override def initialize(scheduler: TaskScheduler, backend: 
SchedulerBackend): Unit = {
-    val sc = scheduler.asInstanceOf[TaskSchedulerImpl]
-    sc.initialize(backend)
-  }
-}
-
-/**
- * A scheduler backend which wraps around local scheduler backend and exposes 
the executor
- * allocation client interface for testing dynamic allocation.
- */
-private class DummyLocalSchedulerBackend (sc: SparkContext, sb: 
SchedulerBackend)
-  extends SchedulerBackend with ExecutorAllocationClient {
-
-  override private[spark] def getExecutorIds(): Seq[String] = Nil
-
-  override private[spark] def requestTotalExecutors(
-      numExecutors: Int,
-      localityAwareTasks: Int,
-      hostToLocalTaskCount: Map[String, Int]): Boolean = true
-
-  override def requestExecutors(numAdditionalExecutors: Int): Boolean = true
-
-  override def killExecutors(
-      executorIds: Seq[String],
-      adjustTargetNumExecutors: Boolean,
-      countFailures: Boolean,
-      force: Boolean): Seq[String] = executorIds
-
-  override def start(): Unit = sb.start()
-
-  override def stop(): Unit = sb.stop()
-
-  override def reviveOffers(): Unit = sb.reviveOffers()
-
-  override def defaultParallelism(): Int = sb.defaultParallelism()
-
-  override def maxNumConcurrentTasks(): Int = sb.maxNumConcurrentTasks()
-
-  override def killExecutorsOnHost(host: String): Boolean = {
-    false
-  }
-}
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
index e33c3f8..e38951f 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceDbSuite.scala
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets
 import com.google.common.io.CharStreams
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.internal.config._
 import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, 
ExternalShuffleBlockResolver}
 import org.apache.spark.network.shuffle.TestShuffleDataContext
 import org.apache.spark.util.Utils
@@ -70,10 +71,12 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
     }
   }
 
+  def shuffleServiceConf: SparkConf = 
sparkConf.clone().set(SHUFFLE_SERVICE_PORT, 0)
+
   def registerExecutor(): Unit = {
     try {
       sparkConf.set("spark.shuffle.service.db.enabled", "true")
-      externalShuffleService = new ExternalShuffleService(sparkConf, 
securityManager)
+      externalShuffleService = new ExternalShuffleService(shuffleServiceConf, 
securityManager)
 
       // external Shuffle Service start
       externalShuffleService.start()
@@ -94,7 +97,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
     "shuffle service restart") {
     try {
       sparkConf.set("spark.shuffle.service.db.enabled", "true")
-      externalShuffleService = new ExternalShuffleService(sparkConf, 
securityManager)
+      externalShuffleService = new ExternalShuffleService(shuffleServiceConf, 
securityManager)
       // externalShuffleService restart
       externalShuffleService.start()
       blockHandler = externalShuffleService.getBlockHandler
@@ -120,7 +123,7 @@ class ExternalShuffleServiceDbSuite extends SparkFunSuite {
     " shuffle service restart") {
     try {
       sparkConf.set("spark.shuffle.service.db.enabled", "false")
-      externalShuffleService = new ExternalShuffleService(sparkConf, 
securityManager)
+      externalShuffleService = new ExternalShuffleService(shuffleServiceConf, 
securityManager)
       // externalShuffleService restart
       externalShuffleService.start()
       blockHandler = externalShuffleService.getBlockHandler
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
new file mode 100644
index 0000000..8d1577e
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.dynalloc
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, when}
+
+import org.apache.spark._
+import org.apache.spark.internal.config._
+import org.apache.spark.scheduler._
+import org.apache.spark.storage._
+import org.apache.spark.util.ManualClock
+
+class ExecutorMonitorSuite extends SparkFunSuite {
+
+  private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L)
+  private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L)
+
+  private val conf = new SparkConf()
+    .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s")
+    .set(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT.key, "120s")
+
+  private var monitor: ExecutorMonitor = _
+  private var client: ExecutorAllocationClient = _
+  private var clock: ManualClock = _
+
+  // List of known executors. Allows easily mocking which executors are alive 
without
+  // having to use mockito APIs directly in each test.
+  private val knownExecs = mutable.HashSet[String]()
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    knownExecs.clear()
+    clock = new ManualClock()
+    client = mock(classOf[ExecutorAllocationClient])
+    when(client.isExecutorActive(any())).thenAnswer { invocation =>
+      knownExecs.contains(invocation.getArguments()(0).asInstanceOf[String])
+    }
+    monitor = new ExecutorMonitor(conf, client, clock)
+  }
+
+  test("basic executor timeout") {
+    knownExecs += "1"
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    assert(monitor.executorCount === 1)
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+  }
+
+  test("SPARK-4951, SPARK-26927: handle out of order task start events") {
+    knownExecs ++= Set("1", "2")
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
+    assert(monitor.executorCount === 1)
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    assert(monitor.executorCount === 1)
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"2", null))
+    assert(monitor.executorCount === 2)
+
+    
monitor.onExecutorRemoved(SparkListenerExecutorRemoved(clock.getTimeMillis(), 
"2", null))
+    assert(monitor.executorCount === 1)
+
+    knownExecs -= "2"
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 2)))
+    assert(monitor.executorCount === 1)
+  }
+
+  test("track tasks running on executor") {
+    knownExecs += "1"
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
+    assert(!monitor.isExecutorIdle("1"))
+
+    // Start/end a few tasks and make sure the executor does not go idle.
+    (2 to 10).foreach { i =>
+      monitor.onTaskStart(SparkListenerTaskStart(i, 1, taskInfo("1", 1)))
+      assert(!monitor.isExecutorIdle("1"))
+
+      monitor.onTaskEnd(SparkListenerTaskEnd(i, 1, "foo", Success, 
taskInfo("1", 1), null))
+      assert(!monitor.isExecutorIdle("1"))
+    }
+
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, taskInfo("1", 
1), null))
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(clock.getTimeMillis()).isEmpty)
+    assert(monitor.timedOutExecutors(clock.getTimeMillis() + idleTimeoutMs + 
1) === Seq("1"))
+  }
+
+  test("use appropriate time out depending on whether blocks are stored") {
+    knownExecs += "1"
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
+
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.NONE))
+    assert(monitor.isExecutorIdle("1"))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("1", 1)))
+    assert(!monitor.isExecutorIdle("1"))
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
+    assert(!monitor.isExecutorIdle("1"))
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = StorageLevel.NONE))
+    assert(!monitor.isExecutorIdle("1"))
+  }
+
+  test("keeps track of stored blocks for each rdd and split") {
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
+
+    monitor.onBlockUpdated(rddUpdate(1, 1, "1"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
+
+    monitor.onBlockUpdated(rddUpdate(2, 0, "1"))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
+
+    monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = StorageLevel.NONE))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
+
+    monitor.onUnpersistRDD(SparkListenerUnpersistRDD(1))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) === Seq("1"))
+
+    // Make sure that if we get an unpersist event much later, which moves an 
executor from having
+    // cached blocks to no longer having cached blocks, it will time out based 
on the time it
+    // originally went idle.
+    clock.setTime(idleDeadline)
+    monitor.onUnpersistRDD(SparkListenerUnpersistRDD(2))
+    assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
+  }
+
+  test("handle timeouts correctly with multiple executors") {
+    knownExecs ++= Set("1", "2", "3")
+
+    // start exec 1 at 0s (should idle time out at 60s)
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    assert(monitor.isExecutorIdle("1"))
+
+    // start exec 2 at 30s, store a block (should idle time out at 150s)
+    clock.setTime(TimeUnit.SECONDS.toMillis(30))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"2", null))
+    monitor.onBlockUpdated(rddUpdate(1, 0, "2"))
+    assert(monitor.isExecutorIdle("2"))
+    assert(!monitor.timedOutExecutors(idleDeadline).contains("2"))
+
+    // start exec 3 at 60s (should idle timeout at 120s, exec 1 should time 
out)
+    clock.setTime(TimeUnit.SECONDS.toMillis(60))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"3", null))
+    assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
+
+    // store block on exec 3 (should now idle time out at 180s)
+    monitor.onBlockUpdated(rddUpdate(1, 0, "3"))
+    assert(monitor.isExecutorIdle("3"))
+    assert(!monitor.timedOutExecutors(idleDeadline).contains("3"))
+
+    // advance to 140s, remove block from exec 3 (time out immediately)
+    clock.setTime(TimeUnit.SECONDS.toMillis(140))
+    monitor.onBlockUpdated(rddUpdate(1, 0, "3", level = StorageLevel.NONE))
+    assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", 
"3"))
+
+    // advance to 150s, now exec 2 should time out
+    clock.setTime(TimeUnit.SECONDS.toMillis(150))
+    assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", 
"2", "3"))
+  }
+
+  test("SPARK-27677: don't track blocks stored on disk when using shuffle 
service") {
+    // First make sure that blocks on disk are counted when no shuffle service 
is available.
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = 
StorageLevel.DISK_ONLY))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) ===  Seq("1"))
+
+    conf.set(SHUFFLE_SERVICE_ENABLED, 
true).set(SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
+    monitor = new ExecutorMonitor(conf, client, clock)
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = 
StorageLevel.MEMORY_ONLY))
+    monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = 
StorageLevel.MEMORY_ONLY))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) ===  Seq("1"))
+
+    monitor.onBlockUpdated(rddUpdate(1, 0, "1", level = 
StorageLevel.DISK_ONLY))
+    assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
+    assert(monitor.timedOutExecutors(storageDeadline) ===  Seq("1"))
+
+    monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = 
StorageLevel.DISK_ONLY))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+
+    // Tag the block as being both in memory and on disk, which may happen 
after it was
+    // evicted and then restored into memory. Since it's still on disk the 
executor should
+    // still be eligible for removal.
+    monitor.onBlockUpdated(rddUpdate(1, 1, "1", level = 
StorageLevel.MEMORY_AND_DISK))
+    assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
+  }
+
+  test("track executors pending for removal") {
+    knownExecs ++= Set("1", "2", "3")
+
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"1", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"2", null))
+    monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), 
"3", null))
+    clock.setTime(idleDeadline)
+    assert(monitor.timedOutExecutors().toSet === Set("1", "2", "3"))
+    assert(monitor.pendingRemovalCount === 0)
+
+    // Notify that only a subset of executors was killed, to mimic the case 
where the scheduler
+    // refuses to kill an executor that is busy for whatever reason the 
monitor hasn't detected yet.
+    monitor.executorsKilled(Seq("1"))
+    assert(monitor.timedOutExecutors().toSet === Set("2", "3"))
+    assert(monitor.pendingRemovalCount === 1)
+
+    // Check the timed out executors again so that we're sure they're still 
timed out when no
+    // events happen. This ensures that the monitor doesn't lose track of them.
+    assert(monitor.timedOutExecutors().toSet === Set("2", "3"))
+
+    monitor.onTaskStart(SparkListenerTaskStart(1, 1, taskInfo("2", 1)))
+    assert(monitor.timedOutExecutors().toSet === Set("3"))
+
+    monitor.executorsKilled(Seq("3"))
+    assert(monitor.pendingRemovalCount === 2)
+
+    monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, taskInfo("2", 
1), null))
+    assert(monitor.timedOutExecutors().isEmpty)
+    clock.advance(idleDeadline)
+    assert(monitor.timedOutExecutors().toSet === Set("2"))
+  }
+
+  private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
+  private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs 
+ 1
+
+  private def taskInfo(
+      execId: String,
+      id: Int,
+      speculative: Boolean = false,
+      duration: Long = -1L): TaskInfo = {
+    val start = if (duration > 0) clock.getTimeMillis() - duration else 
clock.getTimeMillis()
+    val task = new TaskInfo(id, id, 1, start, execId, "foo.example.com",
+      TaskLocality.PROCESS_LOCAL, speculative)
+    if (duration > 0) {
+      task.markFinished(TaskState.FINISHED, math.max(1, clock.getTimeMillis()))
+    }
+    task
+  }
+
+  private def rddUpdate(
+      rddId: Int,
+      splitIndex: Int,
+      execId: String,
+      level: StorageLevel = StorageLevel.MEMORY_ONLY): 
SparkListenerBlockUpdated = {
+    SparkListenerBlockUpdated(
+      BlockUpdatedInfo(BlockManagerId(execId, "1.example.com", 42),
+        RDDBlockId(rddId, splitIndex), level, 1L, 0L))
+  }
+
+}
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
index 8df1232..01e3d6a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerInfoSuite.scala
@@ -47,7 +47,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
       broadcastId, StorageLevel.MEMORY_AND_DISK, memSize = 200, diskSize = 100)
     assert(bmInfo.blocks.asScala ===
       Map(broadcastId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 100)))
-    assert(bmInfo.exclusiveCachedBlocks.isEmpty)
     assert(bmInfo.remainingMem === 29800)
   }
 
@@ -56,7 +55,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     bmInfo.updateBlockInfo(rddId, StorageLevel.MEMORY_ONLY, memSize = 200, 
diskSize = 0)
     assert(bmInfo.blocks.asScala ===
       Map(rddId -> BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
-    assert(bmInfo.exclusiveCachedBlocks === Set(rddId))
     assert(bmInfo.remainingMem === 29800)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
@@ -70,8 +68,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     bmInfo.updateBlockInfo(rddId, StorageLevel.MEMORY_AND_DISK, memSize = 200, 
diskSize = 400)
     assert(bmInfo.blocks.asScala ===
       Map(rddId -> BlockStatus(StorageLevel.MEMORY_AND_DISK, 0, 400)))
-    val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else 
Set(rddId)
-    assert(bmInfo.exclusiveCachedBlocks === 
exclusiveCachedBlocksForOneMemoryOnly)
     assert(bmInfo.remainingMem === 29800)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@@ -85,7 +81,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     assert(bmInfo.blocks.asScala ===
       Map(rddId -> BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
     val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else 
Set(rddId)
-    assert(bmInfo.exclusiveCachedBlocks === 
exclusiveCachedBlocksForOneMemoryOnly)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@@ -99,7 +94,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     val rddId: BlockId = RDDBlockId(0, 0)
     bmInfo.updateBlockInfo(rddId, StorageLevel.MEMORY_ONLY, memSize = 200, 0)
     assert(bmInfo.blocks.asScala  === Map(rddId -> 
BlockStatus(StorageLevel.MEMORY_ONLY, 200, 0)))
-    assert(bmInfo.exclusiveCachedBlocks === Set(rddId))
     assert(bmInfo.remainingMem === 29800)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
@@ -107,8 +101,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
 
     bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, 
diskSize = 200)
     assert(bmInfo.blocks.asScala === Map(rddId -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
-    val exclusiveCachedBlocksForNoMemoryOnly = if (svcEnabled) Set() else 
Set(rddId)
-    assert(bmInfo.exclusiveCachedBlocks === 
exclusiveCachedBlocksForNoMemoryOnly)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@@ -120,8 +112,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     val rddId: BlockId = RDDBlockId(0, 0)
     bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, 
diskSize = 200)
     assert(bmInfo.blocks.asScala === Map(rddId -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
-    val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else 
Set(rddId)
-    assert(bmInfo.exclusiveCachedBlocks === 
exclusiveCachedBlocksForOneMemoryOnly)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@@ -130,7 +120,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
 
     bmInfo.updateBlockInfo(rddId, StorageLevel.NONE, memSize = 0, diskSize = 
200)
     assert(bmInfo.blocks.isEmpty)
-    assert(bmInfo.exclusiveCachedBlocks.isEmpty)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
@@ -141,8 +130,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
     val rddId: BlockId = RDDBlockId(0, 0)
     bmInfo.updateBlockInfo(rddId, StorageLevel.DISK_ONLY, memSize = 0, 
diskSize = 200)
     assert(bmInfo.blocks.asScala === Map(rddId -> 
BlockStatus(StorageLevel.DISK_ONLY, 0, 200)))
-    val exclusiveCachedBlocksForOneMemoryOnly = if (svcEnabled) Set() else 
Set(rddId)
-    assert(bmInfo.exclusiveCachedBlocks === 
exclusiveCachedBlocksForOneMemoryOnly)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.asScala ===
@@ -151,7 +138,6 @@ class BlockManagerInfoSuite extends SparkFunSuite {
 
     bmInfo.removeBlock(rddId)
     assert(bmInfo.blocks.asScala.isEmpty)
-    assert(bmInfo.exclusiveCachedBlocks.isEmpty)
     assert(bmInfo.remainingMem === 30000)
     if (svcEnabled) {
       assert(bmInfo.externalShuffleServiceBlockStatus.get.isEmpty)
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
index d072b99..a8b0055 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ExecutorAllocationManagerSuite.scala
@@ -388,7 +388,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite
   }
 
   private def withStreamingContext(conf: SparkConf)(body: StreamingContext => 
Unit): Unit = {
-    conf.setMaster("myDummyLocalExternalClusterManager")
+    conf.setMaster("local-cluster[1,1,1024]")
       .setAppName(this.getClass.getSimpleName)
       .set("spark.streaming.dynamicAllocation.testing", "true")  // to test 
dynamic allocation
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to