This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dca8380  [SPARK-29950][K8S] Blacklist deleted executors in K8S with 
dynamic allocation
dca8380 is described below

commit dca838058ffd0e2c01591fd9ab0f192de446d606
Author: Marcelo Vanzin <van...@cloudera.com>
AuthorDate: Thu Jan 16 13:37:11 2020 -0800

    [SPARK-29950][K8S] Blacklist deleted executors in K8S with dynamic 
allocation
    
    The issue here is that when Spark is downscaling the application and deletes
    a few pod requests that aren't needed anymore, it may actually race with the
    K8S scheduler, who may be bringing up those executors. So they may have 
enough
    time to connect back to the driver, register, to just be deleted soon after.
    This wastes resources and causes misleading entries in the driver log.
    
    The change (ab)uses the blacklisting mechanism to consider the deleted 
excess
    pods as blacklisted, so that if they try to connect back, the driver will 
deny
    it.
    
    It also changes the executor registration slightly, since even with the 
above
    change there were misleading logs. That was because the executor 
registration
    message was an RPC that always succeeded (bar network issues), so the 
executor
    would always try to send an unregistration message to the driver, which 
would
    then log several messages about not knowing anything about the executor. The
    change makes the registration RPC succeed or fail directly, instead of using
    the separate failure message that would lead to this issue.
    
    Note the last change required some changes in a standalone test suite 
related
    to dynamic allocation, since it relied on the driver not throwing exceptions
    when a duplicate executor registration happened.
    
    Tested with existing unit tests, and with live cluster with dyn alloc on.
    
    Closes #26586 from vanzin/SPARK-29950.
    
    Authored-by: Marcelo Vanzin <van...@cloudera.com>
    Signed-off-by: Marcelo Vanzin <van...@cloudera.com>
---
 .../executor/CoarseGrainedExecutorBackend.scala    | 14 +++--
 .../cluster/CoarseGrainedClusterMessage.scala      |  7 ---
 .../cluster/CoarseGrainedSchedulerBackend.scala    | 19 +++++--
 .../deploy/StandaloneDynamicAllocationSuite.scala  | 65 ++++++++++++++--------
 .../CoarseGrainedSchedulerBackendSuite.scala       |  1 +
 .../cluster/k8s/ExecutorPodsAllocator.scala        | 18 ++++++
 .../k8s/KubernetesClusterSchedulerBackend.scala    |  4 ++
 .../DeterministicExecutorPodsSnapshotsStore.scala  |  9 +++
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 11 ++++
 9 files changed, 105 insertions(+), 43 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index b1837c9..1fe901a 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -54,6 +54,8 @@ private[spark] class CoarseGrainedExecutorBackend(
     resourcesFileOpt: Option[String])
   extends IsolatedRpcEndpoint with ExecutorBackend with Logging {
 
+  import CoarseGrainedExecutorBackend._
+
   private implicit val formats = DefaultFormats
 
   private[this] val stopping = new AtomicBoolean(false)
@@ -80,9 +82,8 @@ private[spark] class CoarseGrainedExecutorBackend(
       ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, 
extractLogUrls,
         extractAttributes, resources))
     }(ThreadUtils.sameThread).onComplete {
-      // This is a very fast action so we can use "ThreadUtils.sameThread"
-      case Success(msg) =>
-        // Always receive `true`. Just ignore it
+      case Success(_) =>
+        self.send(RegisteredExecutor)
       case Failure(e) =>
         exitExecutor(1, s"Cannot register with driver: $driverUrl", e, 
notifyDriver = false)
     }(ThreadUtils.sameThread)
@@ -133,9 +134,6 @@ private[spark] class CoarseGrainedExecutorBackend(
           exitExecutor(1, "Unable to create executor due to " + e.getMessage, 
e)
       }
 
-    case RegisterExecutorFailed(message) =>
-      exitExecutor(1, "Slave registration failed: " + message)
-
     case LaunchTask(data) =>
       if (executor == null) {
         exitExecutor(1, "Received LaunchTask command but executor was null")
@@ -226,6 +224,10 @@ private[spark] class CoarseGrainedExecutorBackend(
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {
 
+  // Message used internally to start the executor when the driver 
successfully accepted the
+  // registration request.
+  case object RegisteredExecutor
+
   case class Arguments(
       driverUrl: String,
       executorId: String,
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index 9ce2368..57317e7 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -48,13 +48,6 @@ private[spark] object CoarseGrainedClusterMessages {
   case class KillExecutorsOnHost(host: String)
     extends CoarseGrainedClusterMessage
 
-  sealed trait RegisterExecutorResponse
-
-  case object RegisteredExecutor extends CoarseGrainedClusterMessage with 
RegisterExecutorResponse
-
-  case class RegisterExecutorFailed(message: String) extends 
CoarseGrainedClusterMessage
-    with RegisterExecutorResponse
-
   case class UpdateDelegationTokens(tokens: Array[Byte])
     extends CoarseGrainedClusterMessage
 
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7c7d8c2..031b9af 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -207,15 +207,14 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
           attributes, resources) =>
         if (executorDataMap.contains(executorId)) {
-          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + 
executorId))
-          context.reply(true)
-        } else if (scheduler.nodeBlacklist.contains(hostname)) {
+          context.sendFailure(new IllegalStateException(s"Duplicate executor 
ID: $executorId"))
+        } else if (scheduler.nodeBlacklist.contains(hostname) ||
+            isBlacklisted(executorId, hostname)) {
           // If the cluster manager gives us an executor on a blacklisted node 
(because it
           // already started allocating those resources before we informed it 
of our blacklist,
           // or if it ignored our blacklist), then we reject that executor 
immediately.
           logInfo(s"Rejecting $executorId as it has been blacklisted.")
-          executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: 
$executorId"))
-          context.reply(true)
+          context.sendFailure(new IllegalStateException(s"Executor is 
blacklisted: $executorId"))
         } else {
           // If the executor's rpc env is not listening for incoming 
connections, `hostPort`
           // will be null, and the client connection should be used to contact 
the executor.
@@ -250,7 +249,6 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
               logDebug(s"Decremented number of pending executors 
($numPendingExecutors left)")
             }
           }
-          executorRef.send(RegisteredExecutor)
           // Note: some tests expect the reply to come after we put the 
executor in the map
           context.reply(true)
           listenerBus.post(
@@ -776,6 +774,15 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 
   protected def currentDelegationTokens: Array[Byte] = delegationTokens.get()
 
+  /**
+   * Checks whether the executor is blacklisted. This is called when the 
executor tries to
+   * register with the scheduler, and will deny registration if this method 
returns true.
+   *
+   * This is in addition to the blacklist kept by the task scheduler, so 
custom implementations
+   * don't need to check there.
+   */
+  protected def isBlacklisted(executorId: String, hostname: String): Boolean = 
false
+
   // SPARK-27112: We need to ensure that there is ordering of lock acquisition
   // between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in 
order to fix
   // the deadlock issue exposed in SPARK-27112
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index dd790b8..e316da7 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
-import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, 
RequestMasterState}
+import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
@@ -34,7 +34,7 @@ import org.apache.spark.internal.config
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.TaskSchedulerImpl
 import org.apache.spark.scheduler.cluster._
-import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor,
 RegisterExecutor, RegisterExecutorFailed}
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{LaunchedExecutor,
 RegisterExecutor}
 
 /**
  * End-to-end tests for dynamic allocation in standalone mode.
@@ -482,12 +482,16 @@ class StandaloneDynamicAllocationSuite
       assert(apps.head.getExecutorLimit === Int.MaxValue)
     }
     val beforeList = getApplications().head.executors.keys.toSet
-    assert(killExecutorsOnHost(sc, "localhost").equals(true))
-
     syncExecutors(sc)
-    val afterList = getApplications().head.executors.keys.toSet
+
+    sc.schedulerBackend match {
+      case b: CoarseGrainedSchedulerBackend =>
+        b.killExecutorsOnHost("localhost")
+      case _ => fail("expected coarse grained scheduler")
+    }
 
     eventually(timeout(10.seconds), interval(100.millis)) {
+      val afterList = getApplications().head.executors.keys.toSet
       assert(beforeList.intersect(afterList).size == 0)
     }
   }
@@ -514,10 +518,11 @@ class StandaloneDynamicAllocationSuite
       val scheduler = new CoarseGrainedSchedulerBackend(taskScheduler, rpcEnv)
       try {
         scheduler.start()
-        scheduler.driverEndpoint.ask[Boolean](message)
-        eventually(timeout(10.seconds), interval(100.millis)) {
-          verify(endpointRef).send(RegisterExecutorFailed(any()))
+        val e = intercept[SparkException] {
+          scheduler.driverEndpoint.askSync[Boolean](message)
         }
+        assert(e.getCause().isInstanceOf[IllegalStateException])
+        assert(scheduler.getExecutorIds().isEmpty)
       } finally {
         scheduler.stop()
       }
@@ -536,6 +541,11 @@ class StandaloneDynamicAllocationSuite
       .setMaster(masterRpcEnv.address.toSparkURL)
       .setAppName("test")
       .set(config.EXECUTOR_MEMORY.key, "256m")
+      // Because we're faking executor launches in the Worker, set the config 
so that the driver
+      // will not timeout anything related to executors.
+      .set(config.Network.NETWORK_TIMEOUT.key, "2h")
+      .set(config.EXECUTOR_HEARTBEAT_INTERVAL.key, "1h")
+      .set(config.STORAGE_BLOCKMANAGER_SLAVE_TIMEOUT.key, "1h")
   }
 
   /** Make a master to which our application will send executor requests. */
@@ -549,8 +559,7 @@ class StandaloneDynamicAllocationSuite
   private def makeWorkers(cores: Int, memory: Int): Seq[Worker] = {
     (0 until numWorkers).map { i =>
       val rpcEnv = workerRpcEnvs(i)
-      val worker = new Worker(rpcEnv, 0, cores, memory, 
Array(masterRpcEnv.address),
-        Worker.ENDPOINT_NAME, null, conf, securityManager)
+      val worker = new TestWorker(rpcEnv, cores, memory)
       rpcEnv.setupEndpoint(Worker.ENDPOINT_NAME, worker)
       worker
     }
@@ -588,16 +597,6 @@ class StandaloneDynamicAllocationSuite
     }
   }
 
-  /** Kill the executors on a given host. */
-  private def killExecutorsOnHost(sc: SparkContext, host: String): Boolean = {
-    syncExecutors(sc)
-    sc.schedulerBackend match {
-      case b: CoarseGrainedSchedulerBackend =>
-        b.killExecutorsOnHost(host)
-      case _ => fail("expected coarse grained scheduler")
-    }
-  }
-
   /**
    * Return a list of executor IDs belonging to this application.
    *
@@ -620,9 +619,8 @@ class StandaloneDynamicAllocationSuite
    * we submit a request to kill them. This must be called before each kill 
request.
    */
   private def syncExecutors(sc: SparkContext): Unit = {
-    val driverExecutors = sc.env.blockManager.master.getStorageStatus
-      .map(_.blockManagerId.executorId)
-      .filter { _ != SparkContext.DRIVER_IDENTIFIER}
+    val backend = 
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
+    val driverExecutors = backend.getExecutorIds()
     val masterExecutors = getExecutorIds(sc)
     val missingExecutors = 
masterExecutors.toSet.diff(driverExecutors.toSet).toSeq.sorted
     missingExecutors.foreach { id =>
@@ -632,10 +630,29 @@ class StandaloneDynamicAllocationSuite
       when(endpointRef.address).thenReturn(mockAddress)
       val message = RegisterExecutor(id, endpointRef, "localhost", 10, 
Map.empty, Map.empty,
         Map.empty)
-      val backend = 
sc.schedulerBackend.asInstanceOf[CoarseGrainedSchedulerBackend]
       backend.driverEndpoint.askSync[Boolean](message)
       backend.driverEndpoint.send(LaunchedExecutor(id))
     }
   }
 
+  /**
+   * Worker implementation that does not actually launch any executors, but 
reports them as
+   * running so the Master keeps track of them. This requires that 
`syncExecutors` be used
+   * to make sure the Master instance and the SparkContext under test agree 
about what
+   * executors are running.
+   */
+  private class TestWorker(rpcEnv: RpcEnv, cores: Int, memory: Int)
+    extends Worker(
+      rpcEnv, 0, cores, memory, Array(masterRpcEnv.address), 
Worker.ENDPOINT_NAME,
+      null, conf, securityManager) {
+
+    override def receive: PartialFunction[Any, Unit] = 
testReceive.orElse(super.receive)
+
+    private def testReceive: PartialFunction[Any, Unit] = synchronized {
+      case LaunchExecutor(_, appId, execId, _, _, _, _) =>
+        self.send(ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, 
None, None))
+    }
+
+  }
+
 }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
index f916f63..29160a3 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala
@@ -189,6 +189,7 @@ class CoarseGrainedSchedulerBackendSuite extends 
SparkFunSuite with LocalSparkCo
     val conf = new SparkConf()
       .set(EXECUTOR_CORES, 1)
       .set(SCHEDULER_REVIVE_INTERVAL.key, "1m") // don't let it auto revive 
during test
+      .set(EXECUTOR_INSTANCES, 0) // avoid errors about duplicate executor 
registrations
       .setMaster(
       
"coarseclustermanager[org.apache.spark.scheduler.TestCoarseGrainedSchedulerBackend]")
       .setAppName("test")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 2201bf9..b394f35 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -72,6 +72,11 @@ private[spark] class ExecutorPodsAllocator(
 
   private var lastSnapshot = ExecutorPodsSnapshot(Nil)
 
+  // Executors that have been deleted by this allocator but not yet detected 
as deleted in
+  // a snapshot from the API server. This is used to deny registration from 
these executors
+  // if they happen to come up before the deletion takes effect.
+  @volatile private var deletedExecutorIds = Set.empty[Long]
+
   def start(applicationId: String): Unit = {
     snapshotsStore.addSubscriber(podAllocationDelay) {
       onNewSnapshots(applicationId, _)
@@ -85,6 +90,8 @@ private[spark] class ExecutorPodsAllocator(
     }
   }
 
+  def isDeleted(executorId: String): Boolean = 
deletedExecutorIds.contains(executorId.toLong)
+
   private def onNewSnapshots(
       applicationId: String,
       snapshots: Seq[ExecutorPodsSnapshot]): Unit = synchronized {
@@ -141,10 +148,17 @@ private[spark] class ExecutorPodsAllocator(
       }
       .map { case (id, _) => id }
 
+    // Make a local, non-volatile copy of the reference since it's used 
multiple times. This
+    // is the only method that modifies the list, so this is safe.
+    var _deletedExecutorIds = deletedExecutorIds
+
     if (snapshots.nonEmpty) {
       logDebug(s"Pod allocation status: $currentRunningCount running, " +
         s"${currentPendingExecutors.size} pending, " +
         s"${newlyCreatedExecutors.size} unacknowledged.")
+
+      val existingExecs = lastSnapshot.executorPods.keySet
+      _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
     }
 
     val currentTotalExpectedExecutors = totalExpectedExecutors.get
@@ -169,6 +183,8 @@ private[spark] class ExecutorPodsAllocator(
 
       if (toDelete.nonEmpty) {
         logInfo(s"Deleting ${toDelete.size} excess pod requests 
(${toDelete.mkString(",")}).")
+        _deletedExecutorIds = _deletedExecutorIds ++ toDelete
+
         Utils.tryLogNonFatalError {
           kubernetesClient
             .pods()
@@ -209,6 +225,8 @@ private[spark] class ExecutorPodsAllocator(
       }
     }
 
+    deletedExecutorIds = _deletedExecutorIds
+
     // Update the flag that helps the setTotalExpectedExecutors() callback 
avoid triggering this
     // update method when not needed.
     hasPendingPods.set(knownPendingCount + newlyCreatedExecutors.size > 0)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index e221a92..105841a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -181,6 +181,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
     Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, 
driverEndpoint))
   }
 
+  override protected def isBlacklisted(executorId: String, hostname: String): 
Boolean = {
+    podAllocator.isDeleted(executorId)
+  }
+
   private class KubernetesDriverEndpoint extends DriverEndpoint {
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
index 1b6dfe5..9ac7e02 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -48,4 +48,13 @@ class DeterministicExecutorPodsSnapshotsStore extends 
ExecutorPodsSnapshotsStore
     currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
     snapshotsBuffer += currentSnapshot
   }
+
+  def removeDeletedExecutors(): Unit = {
+    val nonDeleted = currentSnapshot.executorPods.filter {
+      case (_, PodDeleted(_)) => false
+      case _ => true
+    }
+    currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
+    snapshotsBuffer += currentSnapshot
+  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 4475d5d..a0abded 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -189,6 +189,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     verify(podOperations, times(4)).create(any())
     verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
     verify(podOperations).delete()
+    assert(podsAllocatorUnderTest.isDeleted("3"))
+    assert(podsAllocatorUnderTest.isDeleted("4"))
+
+    // Update the snapshot to not contain the deleted executors, make sure the
+    // allocator cleans up internal state.
+    snapshotsStore.updatePod(deletedExecutor(3))
+    snapshotsStore.updatePod(deletedExecutor(4))
+    snapshotsStore.removeDeletedExecutors()
+    snapshotsStore.notifySubscribers()
+    assert(!podsAllocatorUnderTest.isDeleted("3"))
+    assert(!podsAllocatorUnderTest.isDeleted("4"))
   }
 
   private def executorPodAnswer(): Answer[SparkPod] =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to