This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 28f2d51  [SPARK-34361][K8S] In case of downscaling avoid killing of 
executors already known by the scheduler backend in the pod allocator
28f2d51 is described below

commit 28f2d51e5950a8b99a37814c9396b23315880fe5
Author: “attilapiros” <piros.attila.zs...@gmail.com>
AuthorDate: Tue Mar 2 16:58:29 2021 -0800

    [SPARK-34361][K8S] In case of downscaling avoid killing of executors 
already known by the scheduler backend in the pod allocator
    
    ### What changes were proposed in this pull request?
    
    This PR modifies the POD allocator to use the scheduler backend to get the 
known executors and remove those from the pending and newly created list.
    
    This is different from the normal `ExecutorAllocationManager` requested 
killing of executors where the  `spark.dynamicAllocation.executorIdleTimeout` 
is used.
    In this case POD allocator kills the executors which  should be only 
responsible for terminating not satisfied POD allocations (new requests where 
no POD state is received yet and PODs in pending state).
    
    ### Why are the changes needed?
    
    Because there is race between executor POD allocator and cluster scheduler 
backend.
    Running several experiment during downscaling we experienced a lot of 
killed fresh executors wich has already running task on them.
    
    The pattern in the log was the following (see executor 312 and TID 2079):
    
    ```
    21/02/01 15:12:03 INFO ExecutorMonitor: New executor 312 has registered 
(new total is 138)
    ...
    21/02/01 15:12:03 INFO TaskSetManager: Starting task 247.0 in stage 4.0 
(TID 2079, 100.100.18.138, executor 312, partition 247, PROCESS_LOCAL, 8777 
bytes)
    21/02/01 15:12:03 INFO ExecutorPodsAllocator: Deleting 3 excess pod 
requests (408,312,307).
    ...
    21/02/01 15:12:04 ERROR TaskSchedulerImpl: Lost executor 312 on 
100.100.18.138: The executor with id 312 was deleted by a user or the framework.
    21/02/01 15:12:04 INFO TaskSetManager: Task 2079 failed because while it 
was being computed, its executor exited for a reason unrelated to the task. Not 
counting this failure towards the maximum number of failures for the task.
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    #### Manually
    
    With this change there was no executor lost with running task on it.
    
    ##### With unit test
    
    A new test is added and existing test is modified to check these cases.
    
    Closes #31513 from attilapiros/SPARK-34361.
    
    Authored-by: “attilapiros” <piros.attila.zs...@gmail.com>
    Signed-off-by: Holden Karau <hka...@apple.com>
    (cherry picked from commit 6c5322de6176726955b4bc941f92ecaa54a7f539)
    Signed-off-by: Holden Karau <hka...@apple.com>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala |   3 +-
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  66 ++++++---
 .../k8s/KubernetesClusterSchedulerBackend.scala    |   2 +-
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 155 ++++++++++++++++++++-
 .../KubernetesClusterSchedulerBackendSuite.scala   |   2 +-
 5 files changed, 204 insertions(+), 24 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 68dcef1..bec29a9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -237,7 +237,8 @@ private[spark] object Config extends Logging {
 
   val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT =
     ConfigBuilder("spark.kubernetes.allocation.executor.timeout")
-      .doc("Time to wait before considering a pending executor timedout.")
+      .doc("Time to wait before a newly created executor POD request, which 
does not reached " +
+        "the POD pending state yet, considered timedout and will be deleted.")
       .version("3.1.0")
       .timeConf(TimeUnit.MILLISECONDS)
       .checkValue(value => value > 0, "Allocation executor timeout must be a 
positive time value.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index eb35de8..5fc81a6 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.time.Instant
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
+import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -82,9 +82,14 @@ private[spark] class ExecutorPodsAllocator(
   // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they 
were created.
   private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, (Int, 
Long)]
 
+  // Executor IDs that have been requested from Kubernetes but have not been 
detected in any POD
+  // snapshot yet but already known by the scheduler backend. Mapped to the 
ResourceProfile id.
+  private val schedulerKnownNewlyCreatedExecs = 
mutable.LinkedHashMap.empty[Long, Int]
+
   private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf)
 
-  private val hasPendingPods = new AtomicBoolean()
+  // visible for tests
+  private[k8s] val numOutstandingPods = new AtomicInteger()
 
   private var lastSnapshot = ExecutorPodsSnapshot()
 
@@ -93,9 +98,9 @@ private[spark] class ExecutorPodsAllocator(
   // if they happen to come up before the deletion takes effect.
   @volatile private var deletedExecutorIds = Set.empty[Long]
 
-  def start(applicationId: String): Unit = {
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
     snapshotsStore.addSubscriber(podAllocationDelay) {
-      onNewSnapshots(applicationId, _)
+      onNewSnapshots(applicationId, schedulerBackend, _)
     }
   }
 
@@ -105,7 +110,7 @@ private[spark] class ExecutorPodsAllocator(
       totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs)
     }
     logDebug(s"Set total expected execs to 
$totalExpectedExecutorsPerResourceProfileId")
-    if (!hasPendingPods.get()) {
+    if (numOutstandingPods.get() == 0) {
       snapshotsStore.notifySubscribers()
     }
   }
@@ -114,8 +119,19 @@ private[spark] class ExecutorPodsAllocator(
 
   private def onNewSnapshots(
       applicationId: String,
+      schedulerBackend: KubernetesClusterSchedulerBackend,
       snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
-    newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys)
+    val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys)
+    newlyCreatedExecutors --= k8sKnownExecIds
+    schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds
+
+    // transfer the scheduler backend known executor requests from the 
newlyCreatedExecutors
+    // to the schedulerKnownNewlyCreatedExecs
+    val schedulerKnownExecs = 
schedulerBackend.getExecutorIds().map(_.toLong).toSet
+    schedulerKnownNewlyCreatedExecs ++=
+      
newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
+    newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet
+
     // For all executors we've created against the API but have not seen in a 
snapshot
     // yet - check the current time. If the current time has exceeded some 
threshold,
     // assume that the pod was either never created (the API server never 
properly
@@ -164,15 +180,16 @@ private[spark] class ExecutorPodsAllocator(
       _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains)
     }
 
+    val notDeletedPods = 
lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_))
     // Map the pods into per ResourceProfile id so we can check per 
ResourceProfile,
     // add a fast path if not using other ResourceProfiles.
     val rpIdToExecsAndPodState =
       mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]()
     if (totalExpectedExecutorsPerResourceProfileId.size <= 1) {
       rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) =
-        mutable.HashMap.empty ++= lastSnapshot.executorPods
+        mutable.HashMap.empty ++= notDeletedPods
     } else {
-      lastSnapshot.executorPods.foreach { case (execId, execPodState) =>
+      notDeletedPods.foreach { case (execId, execPodState) =>
         val rpId = 
execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt
         val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId,
           mutable.HashMap[Long, ExecutorPodState]())
@@ -190,24 +207,33 @@ private[spark] class ExecutorPodsAllocator(
         case _ => false
       }
 
-      val currentPendingExecutors = podsForRpId.filter {
+      val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) 
= podsForRpId.filter {
         case (_, PodPending(_)) => true
         case _ => false
+      }.partition { case (k, _) =>
+        schedulerKnownExecs.contains(k)
       }
       // This variable is used later to print some debug logs. It's updated 
when cleaning up
-      // excess pod requests, since currentPendingExecutors is immutable.
-      var knownPendingCount = currentPendingExecutors.size
+      // excess pod requests, since currentPendingExecutorsForRpId is 
immutable.
+      var knownPendingCount = currentPendingExecutorsForRpId.size
 
       val newlyCreatedExecutorsForRpId =
         newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
           rpId == waitingRpId
         }
 
+      val schedulerKnownNewlyCreatedExecsForRpId =
+        schedulerKnownNewlyCreatedExecs.filter { case (_, waitingRpId) =>
+          rpId == waitingRpId
+        }
+
       if (podsForRpId.nonEmpty) {
         logDebug(s"ResourceProfile Id: $rpId " +
           s"pod allocation status: $currentRunningCount running, " +
-          s"${currentPendingExecutors.size} pending. " +
-          s"${newlyCreatedExecutorsForRpId.size} unacknowledged.")
+          s"${currentPendingExecutorsForRpId.size} unknown pending, " +
+          s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known 
pending, " +
+          s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
+          s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend 
known newly created.")
       }
 
       // It's possible that we have outstanding pods that are outdated when 
dynamic allocation
@@ -218,8 +244,9 @@ private[spark] class ExecutorPodsAllocator(
       //
       // TODO: with dynamic allocation off, handle edge cases if we end up 
with more running
       // executors than expected.
-      val knownPodCount = currentRunningCount + currentPendingExecutors.size +
-        newlyCreatedExecutorsForRpId.size
+      val knownPodCount = currentRunningCount +
+        currentPendingExecutorsForRpId.size + 
schedulerKnownPendingExecsForRpId.size +
+        newlyCreatedExecutorsForRpId.size + 
schedulerKnownNewlyCreatedExecsForRpId.size
 
       if (knownPodCount > targetNum) {
         val excess = knownPodCount - targetNum
@@ -227,7 +254,7 @@ private[spark] class ExecutorPodsAllocator(
           .filter { case (_, (_, createTime)) =>
             currentTime - createTime > executorIdleTimeout
           }.keys.take(excess).toList
-        val knownPendingToDelete = currentPendingExecutors
+        val knownPendingToDelete = currentPendingExecutorsForRpId
           .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
           .take(excess - newlyCreatedToDelete.size)
           .map { case (id, _) => id }
@@ -245,7 +272,7 @@ private[spark] class ExecutorPodsAllocator(
               .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
               .withLabelIn(SPARK_EXECUTOR_ID_LABEL, 
toDelete.sorted.map(_.toString): _*)
               .delete()
-            newlyCreatedExecutors --= toDelete
+            newlyCreatedExecutors --= newlyCreatedToDelete
             knownPendingCount -= knownPendingToDelete.size
           }
         }
@@ -276,8 +303,9 @@ private[spark] class ExecutorPodsAllocator(
     deletedExecutorIds = _deletedExecutorIds
 
     // Update the flag that helps the setTotalExpectedExecutors() callback 
avoid triggering this
-    // update method when not needed.
-    hasPendingPods.set(totalPendingCount + newlyCreatedExecutors.size > 0)
+    // update method when not needed. PODs known by the scheduler backend are 
not counted here as
+    // they considered running PODs and they should not block upscaling.
+    numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size)
   }
 
   private def requestNewExecutors(
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index c35a434..d58e38a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -93,7 +93,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     val initExecs = Map(defaultProfile -> initialExecutors)
     podAllocator.setTotalExpectedExecutors(initExecs)
     lifecycleEventHandler.start(this)
-    podAllocator.start(applicationId())
+    podAllocator.start(applicationId(), this)
     watchEvents.start(applicationId())
     pollEvents.start(applicationId())
     setUpExecutorConfigMap()
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 349bbcd..55be80a 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -81,6 +81,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
   @Mock
   private var executorBuilder: KubernetesExecutorBuilder = _
 
+  @Mock
+  private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
   private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
 
   private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
@@ -96,12 +99,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     waitForExecutorPodsClock = new ManualClock(0L)
     podsAllocatorUnderTest = new ExecutorPodsAllocator(
       conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, 
waitForExecutorPodsClock)
-    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID)
+    when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty)
+    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
   }
 
   test("Initially request executors in batches. Do not request another batch 
if the" +
     " first has not finished.") {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 
(podAllocationSize + 1)))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     for (nextId <- 1 to podAllocationSize) {
       verify(podOperations).create(podWithAttachedContainerForId(nextId))
     }
@@ -111,28 +116,34 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
   test("Request executors in batches. Allow another batch to be requested if" +
     " all pending executors start running.") {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 
(podAllocationSize + 1)))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     for (execId <- 1 until podAllocationSize) {
       snapshotsStore.updatePod(runningExecutor(execId))
     }
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(podOperations, 
never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
     snapshotsStore.updatePod(runningExecutor(podAllocationSize))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 
1))
     snapshotsStore.updatePod(runningExecutor(podAllocationSize))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(podOperations, times(podAllocationSize + 
1)).create(any(classOf[Pod]))
   }
 
   test("When a current batch reaches error states immediately, re-request" +
     " them on the next batch.") {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 
podAllocationSize))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     for (execId <- 1 until podAllocationSize) {
       snapshotsStore.updatePod(runningExecutor(execId))
     }
     val failedPod = failedExecutorWithoutDeletion(podAllocationSize)
     snapshotsStore.updatePod(failedPod)
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 
1))
   }
 
@@ -148,9 +159,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
       .thenReturn(labeledPods)
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(podOperations).create(podWithAttachedContainerForId(1))
     waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(labeledPods).delete()
     verify(podOperations).create(podWithAttachedContainerForId(2))
   }
@@ -174,17 +187,20 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
 
     // Target 1 executor, make sure it's requested, even with an empty initial 
snapshot.
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(podOperations).create(podWithAttachedContainerForId(1))
 
     // Mark executor as running, verify that subsequent allocation cycle is a 
no-op.
     snapshotsStore.updatePod(runningExecutor(1))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
     verify(podOperations, times(1)).create(any())
     verify(podOperations, never()).delete()
 
     // Request 3 more executors, make sure all are requested.
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
     verify(podOperations).create(podWithAttachedContainerForId(2))
     verify(podOperations).create(podWithAttachedContainerForId(3))
     verify(podOperations).create(podWithAttachedContainerForId(4))
@@ -193,6 +209,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     snapshotsStore.updatePod(runningExecutor(2))
     snapshotsStore.updatePod(pendingExecutor(3))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
     verify(podOperations, times(4)).create(any())
     verify(podOperations, never()).delete()
 
@@ -200,6 +217,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
     verify(podOperations, times(4)).create(any())
     verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
     verify(podOperations).delete()
@@ -212,6 +230,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     snapshotsStore.updatePod(deletedExecutor(4))
     snapshotsStore.removeDeletedExecutors()
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
     assert(!podsAllocatorUnderTest.isDeleted("3"))
     assert(!podsAllocatorUnderTest.isDeleted("4"))
   }
@@ -279,6 +298,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     waitForExecutorPodsClock.setTime(startTime)
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     verify(podOperations).create(podWithAttachedContainerForId(1))
     verify(podOperations).create(podWithAttachedContainerForId(2))
     verify(podOperations).create(podWithAttachedContainerForId(3))
@@ -292,16 +312,139 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     // Newly created executors (both acknowledged and not) are protected by 
executorIdleTimeout
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 0))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", 
"2", "3", "4", "5")
     verify(podOperations, never()).delete()
 
     // Newly created executors (both acknowledged and not) are cleaned up.
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
+    when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "3", "4"))
     snapshotsStore.notifySubscribers()
-    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", 
"4", "5")
+    // SPARK-34361: even as 1, 3 and 4 are not timed out as they are 
considered as known PODs so
+    // this is why they are not counted into the outstanding PODs and /they 
are not removed even
+    // though executor 1 is still in pending state and executor 3 and 4 are 
new request without
+    // any state reported by kubernetes and all the three are already timed out
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
+    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5")
     verify(podOperations).delete()
   }
 
+  /**
+   * This test covers some downscaling and upscaling of dynamic allocation on 
kubernetes
+   * along with multiple resource profiles (default and rp) when some executors
+   * already know by the scheduler backend.
+   *
+   * Legend:
+   *
+   * N-: newly created not known by the scheduler backend
+   * N+: newly created known by the scheduler backend
+   * P- / P+ : pending (not know / known) by the scheduler backend
+   * D: deleted
+   *                                       |   default    ||         rp        
| expected
+   *                                       |              ||                   
| outstanding
+   *                                       | 1  | 2  | 3  || 4  | 5  | 6  | 7  
| PODs
+   * 
==========================================================================================
+   *  0) setTotalExpectedExecs with        | N- | N- | N- || N- | N- | N- | N- 
|
+   *       default->3, ro->4               |    |    |    ||    |    |    |    
|      7
+   * 
------------------------------------------------------------------------------------------
+   *  1) make 1 from each rp               | N+ | N- | N- || N+ | N- | N- | N- 
|
+   *     known by backend                  |    |    |    ||    |    |    |    
|      5
+   * 
-------------------------------------------------------------------------------------------
+   *  2) some more backend known + pending | N+ | P+ | P- || N+ | P+ | P- | N- 
|      3
+   * 
-------------------------------------------------------------------------------------------
+   *  3) advance time with idle timeout    |    |    |    ||    |    |    |    
|
+   *     setTotalExpectedExecs with        | N+ | P+ | D  || N+ | P+ | D  | D  
|      0
+   *       default->1, rp->1               |    |    |    ||    |    |    |    
|
+   * 
-------------------------------------------------------------------------------------------
+   *  4) setTotalExpectedExecs with        | N+ | P+ | D  || N+ | P+ | D  | D  
|      0 and
+   *       default->2, rp->2               |    |    |    ||    |    |    |    
| no new POD req.
+   * 
===========================================================================================
+   *
+   *  5) setTotalExpectedExecs with default -> 3, rp -> 3 which will lead to 
creation of the new
+   *     PODs: 8 and 9
+   */
+  test("SPARK-34361: scheduler backend known pods with multiple resource 
profiles at downscaling") {
+    when(podOperations
+      .withField("status.phase", "Pending"))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
+      .thenReturn(podOperations)
+
+    val startTime = Instant.now.toEpochMilli
+    waitForExecutorPodsClock.setTime(startTime)
+
+    val rpb = new ResourceProfileBuilder()
+    val ereq = new ExecutorResourceRequests()
+    val treq = new TaskResourceRequests()
+    ereq.cores(4).memory("2g")
+    treq.cpus(2)
+    rpb.require(ereq).require(treq)
+    val rp = rpb.build()
+
+    // 0) request 3 PODs for the default and 4 PODs for the other resource 
profile
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, 
rp -> 4))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 7)
+    verify(podOperations).create(podWithAttachedContainerForId(1, 
defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(2, 
defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(3, 
defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
+    verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
+    verify(podOperations).create(podWithAttachedContainerForId(6, rp.id))
+    verify(podOperations).create(podWithAttachedContainerForId(7, rp.id))
+
+    // 1) make 1 POD known by the scheduler backend for each resource profile
+    when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "4"))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5,
+      "scheduler backend known PODs are not outstanding")
+    verify(podOperations, times(7)).create(any())
+
+    // 2) make 1 extra POD known by the scheduler backend for each resource 
profile
+    // and make some to pending
+    when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "2", "4", "5"))
+    snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id))
+    snapshotsStore.updatePod(pendingExecutor(3, defaultProfile.id))
+    snapshotsStore.updatePod(pendingExecutor(5, rp.id))
+    snapshotsStore.updatePod(pendingExecutor(6, rp.id))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
+    verify(podOperations, times(7)).create(any())
+
+    // 3) downscale to 1 POD for default and 1 POD for the other resource 
profile
+    waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, 
rp -> 1))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
+    verify(podOperations, times(7)).create(any())
+    verify(podOperations, times(2)).delete()
+    assert(podsAllocatorUnderTest.isDeleted("3"))
+    assert(podsAllocatorUnderTest.isDeleted("6"))
+    assert(podsAllocatorUnderTest.isDeleted("7"))
+
+    // 4) upscale to 2 PODs for default and 2 for the other resource profile 
but as there is still
+    // 2 PODs known by the scheduler backend there must be no new POD 
requested to be created
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, 
rp -> 2))
+    snapshotsStore.notifySubscribers()
+    verify(podOperations, times(7)).create(any())
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
+    verify(podOperations, times(7)).create(any())
+
+    // 5) requesting 1 more executor for each resource
+    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, 
rp -> 3))
+    snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
+    verify(podOperations, times(9)).create(any())
+    verify(podOperations).create(podWithAttachedContainerForId(8, 
defaultProfile.id))
+    verify(podOperations).create(podWithAttachedContainerForId(9, rp.id))
+  }
+
   test("SPARK-33288: multiple resource profiles") {
     when(podOperations
       .withField("status.phase", "Pending"))
@@ -330,6 +473,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     // Target 1 executor for default profile, 2 for other profile,
     // make sure it's requested, even with an empty initial snapshot.
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, 
rp -> 2))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
     verify(podOperations).create(podWithAttachedContainerForId(1, 
defaultProfile.id))
     verify(podOperations).create(podWithAttachedContainerForId(2, rp.id))
     verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
@@ -339,6 +483,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     snapshotsStore.updatePod(runningExecutor(2, rp.id))
     snapshotsStore.updatePod(runningExecutor(3, rp.id))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
     verify(podOperations, times(3)).create(any())
     verify(podOperations, never()).delete()
 
@@ -346,6 +491,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     // make sure all are requested.
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, 
rp -> 3))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
     verify(podOperations).create(podWithAttachedContainerForId(4, 
defaultProfile.id))
     verify(podOperations).create(podWithAttachedContainerForId(5, 
defaultProfile.id))
     verify(podOperations).create(podWithAttachedContainerForId(6, 
defaultProfile.id))
@@ -356,6 +502,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     snapshotsStore.updatePod(pendingExecutor(5, defaultProfile.id))
     snapshotsStore.updatePod(pendingExecutor(7, rp.id))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
     verify(podOperations, times(7)).create(any())
     verify(podOperations, never()).delete()
 
@@ -364,6 +511,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, 
rp -> 1))
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
     verify(podOperations, times(7)).create(any())
     verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6")
     verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7")
@@ -379,6 +527,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     snapshotsStore.updatePod(deletedExecutor(7))
     snapshotsStore.removeDeletedExecutors()
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
     assert(!podsAllocatorUnderTest.isDeleted("5"))
     assert(!podsAllocatorUnderTest.isDeleted("6"))
     assert(!podsAllocatorUnderTest.isDeleted("7"))
@@ -399,6 +548,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
       .thenReturn(podOperations)
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6))
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     // Initial request of pods
     verify(podOperations).create(podWithAttachedContainerForId(1))
     verify(podOperations).create(podWithAttachedContainerForId(2))
@@ -414,6 +564,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     // We move forward one allocation cycle
     waitForExecutorPodsClock.setTime(podAllocationDelay + 1)
     snapshotsStore.notifySubscribers()
+    assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
     // We request pod 6
     verify(podOperations).create(podWithAttachedContainerForId(6))
   }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 861d41c..e4a73e2 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -127,7 +127,7 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
   test("Start all components") {
     schedulerBackendUnderTest.start()
     verify(podAllocator).setTotalExpectedExecutors(Map(defaultProfile -> 3))
-    verify(podAllocator).start(TEST_SPARK_APP_ID)
+    verify(podAllocator).start(TEST_SPARK_APP_ID, schedulerBackendUnderTest)
     verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
     verify(watchEvents).start(TEST_SPARK_APP_ID)
     verify(pollEvents).start(TEST_SPARK_APP_ID)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to