This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bd7a62  [SPARK-33711][K8S] Avoid race condition between POD lifecycle 
manager and scheduler backend
6bd7a62 is described below

commit 6bd7a6200f8beaab1c68b2469df05870ea788d49
Author: “attilapiros” <piros.attila.zs...@gmail.com>
AuthorDate: Mon Jan 11 14:25:12 2021 -0800

    [SPARK-33711][K8S] Avoid race condition between POD lifecycle manager and 
scheduler backend
    
    ### What changes were proposed in this pull request?
    
    Missing POD detection is extended by timestamp (and time limit) based check 
to avoid wrongfully detection of missing POD detection.
    
    The two new timestamps:
    - `fullSnapshotTs` is introduced for the `ExecutorPodsSnapshot` which only 
updated by the pod polling snapshot source
    - `registrationTs` is introduced for the `ExecutorData` and it is 
initialized at the executor registration at the scheduler backend
    
    Moreover a new config `spark.kubernetes.executor.missingPodDetectDelta` is 
used to specify the accepted delta between the two.
    
    ### Why are the changes needed?
    
    Watching a POD (`ExecutorPodsWatchSnapshotSource`) only inform about single 
POD changes. This could wrongfully lead to detecting of missing PODs (PODs 
known by scheduler backend but missing from POD snapshots) by the executor POD 
lifecycle manager.
    
    A key indicator of this error is seeing this log message:
    
    > "The executor with ID [some_id] was not found in the cluster but we 
didn't get a reason why. Marking the executor as failed. The executor may have 
been deleted but the driver missed the deletion event."
    
    So one of the problem is running the missing POD detection check even when 
a single POD is changed without having a full consistent snapshot about all the 
PODs (see `ExecutorPodsPollingSnapshotSource`).
    The other problem could be the race between the executor POD lifecycle 
manager and the scheduler backend: so even in case of a having a full snapshot 
the registration at the scheduler backend could precede the snapshot polling 
(and processing of those polled snapshots).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. When the POD is missing then the reason message explaining the 
executor's exit is extended with both timestamps (the polling time and the 
executor registration time) and even the new config is mentioned.
    
    ### How was this patch tested?
    
    The existing unit tests are extended.
    
    Closes #30675 from attilapiros/SPARK-33711.
    
    Authored-by: “attilapiros” <piros.attila.zs...@gmail.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  6 ++-
 .../spark/scheduler/cluster/ExecutorData.scala     |  4 +-
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 11 +++++
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  2 +-
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 50 +++++++++++-----------
 .../cluster/k8s/ExecutorPodsSnapshot.scala         | 12 +++---
 .../k8s/ExecutorPodsSnapshotsStoreImpl.scala       |  8 +++-
 .../DeterministicExecutorPodsSnapshotsStore.scala  |  8 ++--
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala    | 22 +++++++---
 .../cluster/k8s/ExecutorPodsSnapshotSuite.scala    |  4 +-
 .../k8s/ExecutorPodsSnapshotsStoreSuite.scala      | 33 ++++++++------
 11 files changed, 101 insertions(+), 59 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 2bd0b4c..ccb5eb1 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -239,7 +239,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
           }
           val data = new ExecutorData(executorRef, executorAddress, hostname,
             0, cores, logUrlHandler.applyPattern(logUrls, attributes), 
attributes,
-            resourcesInfo, resourceProfileId)
+            resourcesInfo, resourceProfileId, registrationTs = 
System.currentTimeMillis())
           // This must be synchronized because variables mutated
           // in this block are read when requesting executors
           CoarseGrainedSchedulerBackend.this.synchronized {
@@ -629,6 +629,10 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
     executorDataMap.keySet.toSeq
   }
 
+  def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized {
+    executorDataMap.mapValues(v => v.registrationTs).toMap
+  }
+
   override def isExecutorActive(id: String): Boolean = synchronized {
     executorDataMap.contains(id) &&
       !executorsPendingToRemove.contains(id) &&
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
index 0621461..86b44e8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala
@@ -30,6 +30,7 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
  * @param totalCores The total number of cores available to the executor
  * @param resourcesInfo The information of the currently available resources 
on the executor
  * @param resourceProfileId The id of the ResourceProfile being used by this 
executor
+ * @param registrationTs The registration timestamp of this executor
  */
 private[cluster] class ExecutorData(
     val executorEndpoint: RpcEndpointRef,
@@ -40,6 +41,7 @@ private[cluster] class ExecutorData(
     override val logUrlMap: Map[String, String],
     override val attributes: Map[String, String],
     override val resourcesInfo: Map[String, ExecutorResourceInfo],
-    override val resourceProfileId: Int
+    override val resourceProfileId: Int,
+    val registrationTs: Long
 ) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes,
   resourcesInfo, resourceProfileId)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 8dca875..5cc9395 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -477,6 +477,17 @@ private[spark] object Config extends Logging {
       .booleanConf
       .createWithDefault(false)
 
+  val KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA =
+    ConfigBuilder("spark.kubernetes.executor.missingPodDetectDelta")
+      .doc("When a registered executor's POD is missing from the Kubernetes 
API server's polled " +
+        "list of PODs then this delta time is taken as the accepted time 
difference between the " +
+        "registration time and the time of the polling. After this time the 
POD is considered " +
+        "missing from the cluster and the executor will be removed.")
+      .version("3.1.1")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .checkValue(delay => delay > 0, "delay must be a positive time value")
+      .createWithDefaultString("30s")
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SERVICE_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.service.annotation."
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 863cb28..f4cd2d0 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -86,7 +86,7 @@ private[spark] class ExecutorPodsAllocator(
 
   private val hasPendingPods = new AtomicBoolean()
 
-  private var lastSnapshot = ExecutorPodsSnapshot(Nil)
+  private var lastSnapshot = ExecutorPodsSnapshot()
 
   // Executors that have been deleted by this allocator but not yet detected 
as deleted in
   // a snapshot from the API server. This is used to deny registration from 
these executors
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 5d91e52..593d6f6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -43,6 +43,9 @@ private[spark] class ExecutorPodsLifecycleManager(
   import ExecutorPodsLifecycleManager._
 
   private lazy val shouldDeleteExecutors = 
conf.get(KUBERNETES_DELETE_EXECUTORS)
+  private lazy val missingPodDetectDelta = 
conf.get(KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA)
+
+  private var lastFullSnapshotTs: Long = 0
 
   // Keep track of which pods are inactive to avoid contacting the API server 
multiple times.
   // This set is cleaned up when a snapshot containing the updated pod is 
processed.
@@ -109,33 +112,33 @@ private[spark] class ExecutorPodsLifecycleManager(
     // Reconcile the case where Spark claims to know about an executor but the 
corresponding pod
     // is missing from the cluster. This would occur if we miss a deletion 
event and the pod
     // transitions immediately from running to absent. We only need to check 
against the latest
-    // snapshot for this, and we don't do this for executors in the deleted 
executors cache or
-    // that we just removed in this round.
-    val lostExecutors = if (snapshots.nonEmpty) {
-      schedulerBackend.getExecutorIds().map(_.toLong).toSet --
+    // fresh full snapshot (coming from ExecutorPodsPollingSnapshotSource) for 
this, and we don't
+    // do this for executors in the deleted executors cache or that we just 
removed in this round.
+    if (snapshots.nonEmpty && lastFullSnapshotTs != 
snapshots.last.fullSnapshotTs) {
+      lastFullSnapshotTs = snapshots.last.fullSnapshotTs
+      val lostExecutorsWithRegistrationTs =
+        schedulerBackend.getExecutorsWithRegistrationTs().map(t => 
(t._1.toLong, t._2)) --
         snapshots.last.executorPods.keySet -- execIdsRemovedInThisRound
-    } else {
-      Nil
-    }
 
-    lostExecutors.foreach { lostId =>
-      if (removedExecutorsCache.getIfPresent(lostId) == null) {
-        val exitReasonMessage = s"The executor with ID $lostId was not found 
in the" +
-          s" cluster but we didn't get a reason why. Marking the executor as 
failed. The" +
-          s" executor may have been deleted but the driver missed the deletion 
event."
-        logDebug(exitReasonMessage)
-        val exitReason = ExecutorExited(
-          UNKNOWN_EXIT_CODE,
-          exitCausedByApp = false,
-          exitReasonMessage)
-        schedulerBackend.doRemoveExecutor(lostId.toString, exitReason)
+      lostExecutorsWithRegistrationTs.foreach { case (lostExecId, 
lostExecRegistrationTs) =>
+        if (removedExecutorsCache.getIfPresent(lostExecId) == null &&
+            lastFullSnapshotTs - lostExecRegistrationTs > 
missingPodDetectDelta) {
+          val exitReasonMessage = s"The executor with ID $lostExecId 
(registered at " +
+            s"$lostExecRegistrationTs ms) was not found in the cluster at the 
polling time " +
+            s"($lastFullSnapshotTs ms) which is after the accepted detect 
delta time " +
+            s"($missingPodDetectDelta ms) configured by " +
+            s"`${KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA.key}`. " +
+            "The executor may have been deleted but the driver missed the 
deletion event. " +
+            "Marking this executor as failed."
+          logDebug(exitReasonMessage)
+          val exitReason = ExecutorExited(
+            UNKNOWN_EXIT_CODE,
+            exitCausedByApp = false,
+            exitReasonMessage)
+          schedulerBackend.doRemoveExecutor(lostExecId.toString, exitReason)
+        }
       }
     }
-
-    if (lostExecutors.nonEmpty) {
-      logDebug(s"Removed executors with ids ${lostExecutors.mkString(",")}" +
-        s" from Spark that were either found to be deleted or non-existent in 
the cluster.")
-    }
   }
 
   private def onFinalNonDeletedState(
@@ -238,4 +241,3 @@ private[spark] class ExecutorPodsLifecycleManager(
 private object ExecutorPodsLifecycleManager {
   val UNKNOWN_EXIT_CODE = -1
 }
-
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
index cb4d881..76c17cf 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
@@ -29,13 +29,15 @@ import org.apache.spark.internal.Logging
 /**
  * An immutable view of the current executor pods that are running in the 
cluster.
  */
-private[spark] case class ExecutorPodsSnapshot(executorPods: Map[Long, 
ExecutorPodState]) {
+private[spark] case class ExecutorPodsSnapshot(
+    executorPods: Map[Long, ExecutorPodState],
+    fullSnapshotTs: Long) {
 
   import ExecutorPodsSnapshot._
 
   def withUpdate(updatedPod: Pod): ExecutorPodsSnapshot = {
     val newExecutorPods = executorPods ++ toStatesByExecutorId(Seq(updatedPod))
-    new ExecutorPodsSnapshot(newExecutorPods)
+    new ExecutorPodsSnapshot(newExecutorPods, fullSnapshotTs)
   }
 }
 
@@ -43,11 +45,11 @@ object ExecutorPodsSnapshot extends Logging {
   private var shouldCheckAllContainers: Boolean = _
   private var sparkContainerName: String = DEFAULT_EXECUTOR_CONTAINER_NAME
 
-  def apply(executorPods: Seq[Pod]): ExecutorPodsSnapshot = {
-    ExecutorPodsSnapshot(toStatesByExecutorId(executorPods))
+  def apply(executorPods: Seq[Pod], fullSnapshotTs: Long): 
ExecutorPodsSnapshot = {
+    ExecutorPodsSnapshot(toStatesByExecutorId(executorPods), fullSnapshotTs)
   }
 
-  def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, 
ExecutorPodState])
+  def apply(): ExecutorPodsSnapshot = ExecutorPodsSnapshot(Map.empty[Long, 
ExecutorPodState], 0)
 
   def setShouldCheckAllContainers(watchAllContainers: Boolean): Unit = {
     shouldCheckAllContainers = watchAllContainers
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
index 22764d9..df8769b 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreImpl.scala
@@ -28,6 +28,8 @@ import scala.util.control.NonFatal
 import io.fabric8.kubernetes.api.model.Pod
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+import org.apache.spark.util.SystemClock
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -54,7 +56,9 @@ import org.apache.spark.util.ThreadUtils
  * <br>
  * The subscriber notification callback is guaranteed to be called from a 
single thread at a time.
  */
-private[spark] class ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: 
ScheduledExecutorService)
+private[spark] class ExecutorPodsSnapshotsStoreImpl(
+    subscribersExecutor: ScheduledExecutorService,
+    clock: Clock = new SystemClock)
   extends ExecutorPodsSnapshotsStore with Logging {
 
   private val SNAPSHOT_LOCK = new Object()
@@ -99,7 +103,7 @@ private[spark] class 
ExecutorPodsSnapshotsStoreImpl(subscribersExecutor: Schedul
   }
 
   override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = 
SNAPSHOT_LOCK.synchronized {
-    currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+    currentSnapshot = ExecutorPodsSnapshot(newSnapshot, clock.getTimeMillis())
     addCurrentSnapshotToSubscribers()
   }
 
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
index c30efde..dbe2f29 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeterministicExecutorPodsSnapshotsStore.scala
@@ -20,13 +20,15 @@ import io.fabric8.kubernetes.api.model.Pod
 import scala.collection.mutable
 
 import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
-
+import org.apache.spark.util.ManualClock
 
 class DeterministicExecutorPodsSnapshotsStore extends 
ExecutorPodsSnapshotsStore {
 
   ExecutorPodsSnapshot.setShouldCheckAllContainers(false)
   ExecutorPodsSnapshot.setSparkContainerName(DEFAULT_EXECUTOR_CONTAINER_NAME)
 
+  val clock = new ManualClock()
+
   private val snapshotsBuffer = mutable.Buffer.empty[ExecutorPodsSnapshot]
   private val subscribers = mutable.Buffer.empty[Seq[ExecutorPodsSnapshot] => 
Unit]
 
@@ -51,7 +53,7 @@ class DeterministicExecutorPodsSnapshotsStore extends 
ExecutorPodsSnapshotsStore
   }
 
   override def replaceSnapshot(newSnapshot: Seq[Pod]): Unit = {
-    currentSnapshot = ExecutorPodsSnapshot(newSnapshot)
+    currentSnapshot = ExecutorPodsSnapshot(newSnapshot, clock.getTimeMillis())
     snapshotsBuffer += currentSnapshot
   }
 
@@ -60,7 +62,7 @@ class DeterministicExecutorPodsSnapshotsStore extends 
ExecutorPodsSnapshotsStore
       case (_, PodDeleted(_)) => false
       case _ => true
     }
-    currentSnapshot = ExecutorPodsSnapshot(nonDeleted)
+    currentSnapshot = ExecutorPodsSnapshot(nonDeleted, clock.getTimeMillis())
     snapshotsBuffer += currentSnapshot
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index fb6f3ac..762c452 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -57,7 +57,7 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite 
with BeforeAndAfte
     val removedExecutorsCache = 
CacheBuilder.newBuilder().build[java.lang.Long, java.lang.Long]
     snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
     namedExecutorPods = mutable.Map.empty[String, PodResource[Pod, 
DoneablePod]]
-    when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty[String])
+    
when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map.empty[String,
 Long])
     when(kubernetesClient.pods()).thenReturn(podOperations)
     
when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
     eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
@@ -92,13 +92,23 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
 
   test("When the scheduler backend lists executor ids that aren't present in 
the cluster," +
     " remove those executors from Spark.") {
-    when(schedulerBackend.getExecutorIds()).thenReturn(Seq("1"))
-    val msg = s"The executor with ID 1 was not found in the cluster but we 
didn't" +
-      s" get a reason why. Marking the executor as failed. The executor may 
have been" +
-      s" deleted but the driver missed the deletion event."
-    val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
+      
when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map("1" -> 
7L))
+    val missingPodDelta =
+      
eventHandlerUnderTest.conf.get(Config.KUBERNETES_EXECUTOR_MISSING_POD_DETECT_DELTA)
+    snapshotsStore.clock.advance(missingPodDelta + 7)
     snapshotsStore.replaceSnapshot(Seq.empty[Pod])
     snapshotsStore.notifySubscribers()
+    verify(schedulerBackend, never()).doRemoveExecutor(any(), any())
+
+    // 1 more millisecond and the accepted delta is over so the missing POD 
will be detected
+    snapshotsStore.clock.advance(1)
+    snapshotsStore.replaceSnapshot(Seq.empty[Pod])
+    snapshotsStore.notifySubscribers()
+    val msg = "The executor with ID 1 (registered at 7 ms) was not found in 
the cluster at " +
+      "the polling time (30008 ms) which is after the accepted detect delta 
time (30000 ms) " +
+      "configured by `spark.kubernetes.executor.missingPodDetectDelta`. The 
executor may have " +
+      "been deleted but the driver missed the deletion event. Marking this 
executor as failed."
+    val expectedLossReason = ExecutorExited(-1, exitCausedByApp = false, msg)
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
   }
 
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
index 8d285ab..5e66726 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotSuite.scala
@@ -27,7 +27,7 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite {
     (pod, state(pod))
 
   def doTest(testCases: Seq[(Pod, ExecutorPodState)]): Unit = {
-    val snapshot = ExecutorPodsSnapshot(testCases.map(_._1))
+    val snapshot = ExecutorPodsSnapshot(testCases.map(_._1), 0)
     for (((_, state), i) <- testCases.zipWithIndex) {
       assertResult(state.getClass.getName, s"executor ID $i") {
         snapshot.executorPods(i).getClass.getName
@@ -70,7 +70,7 @@ class ExecutorPodsSnapshotSuite extends SparkFunSuite {
     val originalPods = Seq(
       pendingExecutor(0),
       runningExecutor(1))
-    val originalSnapshot = ExecutorPodsSnapshot(originalPods)
+    val originalSnapshot = ExecutorPodsSnapshot(originalPods, 0)
     val snapshotWithUpdatedPod = 
originalSnapshot.withUpdate(succeededExecutor(1))
     assert(snapshotWithUpdatedPod.executorPods ===
       Map(
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
index 614c198..b4240bb 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshotsStoreSuite.scala
@@ -26,15 +26,18 @@ import scala.collection.mutable
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.util.ManualClock
 
 class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   private var eventBufferScheduler: DeterministicScheduler = _
   private var eventQueueUnderTest: ExecutorPodsSnapshotsStoreImpl = _
+  private var clock: ManualClock = _
 
   before {
     eventBufferScheduler = new DeterministicScheduler()
-    eventQueueUnderTest = new 
ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler)
+    clock = new ManualClock()
+    eventQueueUnderTest = new 
ExecutorPodsSnapshotsStoreImpl(eventBufferScheduler, clock)
     ExecutorPodsSnapshot.setShouldCheckAllContainers(false)
   }
 
@@ -52,6 +55,7 @@ class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite 
with BeforeAndAfter
     assert(receivedSnapshots1 === Seq(ExecutorPodsSnapshot()))
     assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
 
+    clock.advance(100)
     pushPodWithIndex(1)
     // Force time to move forward so that the buffer is emitted, scheduling the
     // processing task on the subscription executor...
@@ -60,7 +64,7 @@ class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite 
with BeforeAndAfter
 
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
     assert(receivedSnapshots2 === Seq(ExecutorPodsSnapshot()))
 
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
@@ -68,29 +72,29 @@ class ExecutorPodsSnapshotsStoreSuite extends SparkFunSuite 
with BeforeAndAfter
     // Don't repeat snapshots
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
     assert(receivedSnapshots2 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
     pushPodWithIndex(2)
     pushPodWithIndex(3)
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
 
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), 
podWithIndex(3)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), 
podWithIndex(3)), 0)))
     assert(receivedSnapshots2 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
 
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
     assert(receivedSnapshots1 === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), 
podWithIndex(3)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(1), podWithIndex(2), 
podWithIndex(3)), 0)))
     assert(receivedSnapshots1 === receivedSnapshots2)
   }
 
@@ -113,13 +117,14 @@ class ExecutorPodsSnapshotsStoreSuite extends 
SparkFunSuite with BeforeAndAfter
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
     assert(receivedSnapshots === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0)))
+    clock.advance(100)
     eventQueueUnderTest.replaceSnapshot(Seq(podWithIndex(2)))
     eventBufferScheduler.tick(1000, TimeUnit.MILLISECONDS)
     assert(receivedSnapshots === Seq(
       ExecutorPodsSnapshot(),
-      ExecutorPodsSnapshot(Seq(podWithIndex(1))),
-      ExecutorPodsSnapshot(Seq(podWithIndex(2)))))
+      ExecutorPodsSnapshot(Seq(podWithIndex(1)), 0),
+      ExecutorPodsSnapshot(Seq(podWithIndex(2)), 100)))
   }
 
   private def pushPodWithIndex(index: Int): Unit =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to