This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch branch-3.1 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push: new 28f2d51 [SPARK-34361][K8S] In case of downscaling avoid killing of executors already known by the scheduler backend in the pod allocator 28f2d51 is described below commit 28f2d51e5950a8b99a37814c9396b23315880fe5 Author: “attilapiros” <piros.attila.zs...@gmail.com> AuthorDate: Tue Mar 2 16:58:29 2021 -0800 [SPARK-34361][K8S] In case of downscaling avoid killing of executors already known by the scheduler backend in the pod allocator ### What changes were proposed in this pull request? This PR modifies the POD allocator to use the scheduler backend to get the known executors and remove those from the pending and newly created list. This is different from the normal `ExecutorAllocationManager` requested killing of executors where the `spark.dynamicAllocation.executorIdleTimeout` is used. In this case POD allocator kills the executors which should be only responsible for terminating not satisfied POD allocations (new requests where no POD state is received yet and PODs in pending state). ### Why are the changes needed? Because there is race between executor POD allocator and cluster scheduler backend. Running several experiment during downscaling we experienced a lot of killed fresh executors wich has already running task on them. The pattern in the log was the following (see executor 312 and TID 2079): ``` 21/02/01 15:12:03 INFO ExecutorMonitor: New executor 312 has registered (new total is 138) ... 21/02/01 15:12:03 INFO TaskSetManager: Starting task 247.0 in stage 4.0 (TID 2079, 100.100.18.138, executor 312, partition 247, PROCESS_LOCAL, 8777 bytes) 21/02/01 15:12:03 INFO ExecutorPodsAllocator: Deleting 3 excess pod requests (408,312,307). ... 21/02/01 15:12:04 ERROR TaskSchedulerImpl: Lost executor 312 on 100.100.18.138: The executor with id 312 was deleted by a user or the framework. 21/02/01 15:12:04 INFO TaskSetManager: Task 2079 failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task. ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? #### Manually With this change there was no executor lost with running task on it. ##### With unit test A new test is added and existing test is modified to check these cases. Closes #31513 from attilapiros/SPARK-34361. Authored-by: “attilapiros” <piros.attila.zs...@gmail.com> Signed-off-by: Holden Karau <hka...@apple.com> (cherry picked from commit 6c5322de6176726955b4bc941f92ecaa54a7f539) Signed-off-by: Holden Karau <hka...@apple.com> --- .../scala/org/apache/spark/deploy/k8s/Config.scala | 3 +- .../cluster/k8s/ExecutorPodsAllocator.scala | 66 ++++++--- .../k8s/KubernetesClusterSchedulerBackend.scala | 2 +- .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 155 ++++++++++++++++++++- .../KubernetesClusterSchedulerBackendSuite.scala | 2 +- 5 files changed, 204 insertions(+), 24 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 68dcef1..bec29a9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -237,7 +237,8 @@ private[spark] object Config extends Logging { val KUBERNETES_ALLOCATION_EXECUTOR_TIMEOUT = ConfigBuilder("spark.kubernetes.allocation.executor.timeout") - .doc("Time to wait before considering a pending executor timedout.") + .doc("Time to wait before a newly created executor POD request, which does not reached " + + "the POD pending state yet, considered timedout and will be deleted.") .version("3.1.0") .timeConf(TimeUnit.MILLISECONDS) .checkValue(value => value > 0, "Allocation executor timeout must be a positive time value.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index eb35de8..5fc81a6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,7 +18,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.time.Instant import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} import scala.collection.JavaConverters._ import scala.collection.mutable @@ -82,9 +82,14 @@ private[spark] class ExecutorPodsAllocator( // snapshot yet. Mapped to the (ResourceProfile id, timestamp) when they were created. private val newlyCreatedExecutors = mutable.LinkedHashMap.empty[Long, (Int, Long)] + // Executor IDs that have been requested from Kubernetes but have not been detected in any POD + // snapshot yet but already known by the scheduler backend. Mapped to the ResourceProfile id. + private val schedulerKnownNewlyCreatedExecs = mutable.LinkedHashMap.empty[Long, Int] + private val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(conf) - private val hasPendingPods = new AtomicBoolean() + // visible for tests + private[k8s] val numOutstandingPods = new AtomicInteger() private var lastSnapshot = ExecutorPodsSnapshot() @@ -93,9 +98,9 @@ private[spark] class ExecutorPodsAllocator( // if they happen to come up before the deletion takes effect. @volatile private var deletedExecutorIds = Set.empty[Long] - def start(applicationId: String): Unit = { + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { snapshotsStore.addSubscriber(podAllocationDelay) { - onNewSnapshots(applicationId, _) + onNewSnapshots(applicationId, schedulerBackend, _) } } @@ -105,7 +110,7 @@ private[spark] class ExecutorPodsAllocator( totalExpectedExecutorsPerResourceProfileId.put(rp.id, numExecs) } logDebug(s"Set total expected execs to $totalExpectedExecutorsPerResourceProfileId") - if (!hasPendingPods.get()) { + if (numOutstandingPods.get() == 0) { snapshotsStore.notifySubscribers() } } @@ -114,8 +119,19 @@ private[spark] class ExecutorPodsAllocator( private def onNewSnapshots( applicationId: String, + schedulerBackend: KubernetesClusterSchedulerBackend, snapshots: Seq[ExecutorPodsSnapshot]): Unit = { - newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) + val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys) + newlyCreatedExecutors --= k8sKnownExecIds + schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds + + // transfer the scheduler backend known executor requests from the newlyCreatedExecutors + // to the schedulerKnownNewlyCreatedExecs + val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet + schedulerKnownNewlyCreatedExecs ++= + newlyCreatedExecutors.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1) + newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet + // For all executors we've created against the API but have not seen in a snapshot // yet - check the current time. If the current time has exceeded some threshold, // assume that the pod was either never created (the API server never properly @@ -164,15 +180,16 @@ private[spark] class ExecutorPodsAllocator( _deletedExecutorIds = _deletedExecutorIds.filter(existingExecs.contains) } + val notDeletedPods = lastSnapshot.executorPods.filterKeys(!_deletedExecutorIds.contains(_)) // Map the pods into per ResourceProfile id so we can check per ResourceProfile, // add a fast path if not using other ResourceProfiles. val rpIdToExecsAndPodState = mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]() if (totalExpectedExecutorsPerResourceProfileId.size <= 1) { rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) = - mutable.HashMap.empty ++= lastSnapshot.executorPods + mutable.HashMap.empty ++= notDeletedPods } else { - lastSnapshot.executorPods.foreach { case (execId, execPodState) => + notDeletedPods.foreach { case (execId, execPodState) => val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId, mutable.HashMap[Long, ExecutorPodState]()) @@ -190,24 +207,33 @@ private[spark] class ExecutorPodsAllocator( case _ => false } - val currentPendingExecutors = podsForRpId.filter { + val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) = podsForRpId.filter { case (_, PodPending(_)) => true case _ => false + }.partition { case (k, _) => + schedulerKnownExecs.contains(k) } // This variable is used later to print some debug logs. It's updated when cleaning up - // excess pod requests, since currentPendingExecutors is immutable. - var knownPendingCount = currentPendingExecutors.size + // excess pod requests, since currentPendingExecutorsForRpId is immutable. + var knownPendingCount = currentPendingExecutorsForRpId.size val newlyCreatedExecutorsForRpId = newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) => rpId == waitingRpId } + val schedulerKnownNewlyCreatedExecsForRpId = + schedulerKnownNewlyCreatedExecs.filter { case (_, waitingRpId) => + rpId == waitingRpId + } + if (podsForRpId.nonEmpty) { logDebug(s"ResourceProfile Id: $rpId " + s"pod allocation status: $currentRunningCount running, " + - s"${currentPendingExecutors.size} pending. " + - s"${newlyCreatedExecutorsForRpId.size} unacknowledged.") + s"${currentPendingExecutorsForRpId.size} unknown pending, " + + s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " + + s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " + + s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created.") } // It's possible that we have outstanding pods that are outdated when dynamic allocation @@ -218,8 +244,9 @@ private[spark] class ExecutorPodsAllocator( // // TODO: with dynamic allocation off, handle edge cases if we end up with more running // executors than expected. - val knownPodCount = currentRunningCount + currentPendingExecutors.size + - newlyCreatedExecutorsForRpId.size + val knownPodCount = currentRunningCount + + currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size + + newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size if (knownPodCount > targetNum) { val excess = knownPodCount - targetNum @@ -227,7 +254,7 @@ private[spark] class ExecutorPodsAllocator( .filter { case (_, (_, createTime)) => currentTime - createTime > executorIdleTimeout }.keys.take(excess).toList - val knownPendingToDelete = currentPendingExecutors + val knownPendingToDelete = currentPendingExecutorsForRpId .filter(x => isExecutorIdleTimedOut(x._2, currentTime)) .take(excess - newlyCreatedToDelete.size) .map { case (id, _) => id } @@ -245,7 +272,7 @@ private[spark] class ExecutorPodsAllocator( .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*) .delete() - newlyCreatedExecutors --= toDelete + newlyCreatedExecutors --= newlyCreatedToDelete knownPendingCount -= knownPendingToDelete.size } } @@ -276,8 +303,9 @@ private[spark] class ExecutorPodsAllocator( deletedExecutorIds = _deletedExecutorIds // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this - // update method when not needed. - hasPendingPods.set(totalPendingCount + newlyCreatedExecutors.size > 0) + // update method when not needed. PODs known by the scheduler backend are not counted here as + // they considered running PODs and they should not block upscaling. + numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size) } private def requestNewExecutors( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index c35a434..d58e38a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -93,7 +93,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val initExecs = Map(defaultProfile -> initialExecutors) podAllocator.setTotalExpectedExecutors(initExecs) lifecycleEventHandler.start(this) - podAllocator.start(applicationId()) + podAllocator.start(applicationId(), this) watchEvents.start(applicationId()) pollEvents.start(applicationId()) setUpExecutorConfigMap() diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 349bbcd..55be80a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -81,6 +81,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { @Mock private var executorBuilder: KubernetesExecutorBuilder = _ + @Mock + private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ @@ -96,12 +99,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock = new ManualClock(0L) podsAllocatorUnderTest = new ExecutorPodsAllocator( conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock) - podsAllocatorUnderTest.start(TEST_SPARK_APP_ID) + when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) } test("Initially request executors in batches. Do not request another batch if the" + " first has not finished.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (nextId <- 1 to podAllocationSize) { verify(podOperations).create(podWithAttachedContainerForId(nextId)) } @@ -111,28 +116,34 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { test("Request executors in batches. Allow another batch to be requested if" + " all pending executors start running.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) } snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) snapshotsStore.updatePod(runningExecutor(podAllocationSize)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod])) } test("When a current batch reaches error states immediately, re-request" + " them on the next batch.") { podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> podAllocationSize)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) } val failedPod = failedExecutorWithoutDeletion(podAllocationSize) snapshotsStore.updatePod(failedPod) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) } @@ -148,9 +159,11 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) .thenReturn(labeledPods) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(1)) waitForExecutorPodsClock.setTime(podCreationTimeout + 1) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(labeledPods).delete() verify(podOperations).create(podWithAttachedContainerForId(2)) } @@ -174,17 +187,20 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor, make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(1)) // Mark executor as running, verify that subsequent allocation cycle is a no-op. snapshotsStore.updatePod(runningExecutor(1)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(1)).create(any()) verify(podOperations, never()).delete() // Request 3 more executors, make sure all are requested. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) verify(podOperations).create(podWithAttachedContainerForId(2)) verify(podOperations).create(podWithAttachedContainerForId(3)) verify(podOperations).create(podWithAttachedContainerForId(4)) @@ -193,6 +209,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(2)) snapshotsStore.updatePod(pendingExecutor(3)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) verify(podOperations, times(4)).create(any()) verify(podOperations, never()).delete() @@ -200,6 +217,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(4)).create(any()) verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4") verify(podOperations).delete() @@ -212,6 +230,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(deletedExecutor(4)) snapshotsStore.removeDeletedExecutors() snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) assert(!podsAllocatorUnderTest.isDeleted("3")) assert(!podsAllocatorUnderTest.isDeleted("4")) } @@ -279,6 +298,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.setTime(startTime) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(2)) verify(podOperations).create(podWithAttachedContainerForId(3)) @@ -292,16 +312,139 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Newly created executors (both acknowledged and not) are protected by executorIdleTimeout podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 0)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") verify(podOperations, never()).delete() // Newly created executors (both acknowledged and not) are cleaned up. waitForExecutorPodsClock.advance(executorIdleTimeout * 2) + when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "3", "4")) snapshotsStore.notifySubscribers() - verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5") + // SPARK-34361: even as 1, 3 and 4 are not timed out as they are considered as known PODs so + // this is why they are not counted into the outstanding PODs and /they are not removed even + // though executor 1 is still in pending state and executor 3 and 4 are new request without + // any state reported by kubernetes and all the three are already timed out + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5") verify(podOperations).delete() } + /** + * This test covers some downscaling and upscaling of dynamic allocation on kubernetes + * along with multiple resource profiles (default and rp) when some executors + * already know by the scheduler backend. + * + * Legend: + * + * N-: newly created not known by the scheduler backend + * N+: newly created known by the scheduler backend + * P- / P+ : pending (not know / known) by the scheduler backend + * D: deleted + * | default || rp | expected + * | || | outstanding + * | 1 | 2 | 3 || 4 | 5 | 6 | 7 | PODs + * ========================================================================================== + * 0) setTotalExpectedExecs with | N- | N- | N- || N- | N- | N- | N- | + * default->3, ro->4 | | | || | | | | 7 + * ------------------------------------------------------------------------------------------ + * 1) make 1 from each rp | N+ | N- | N- || N+ | N- | N- | N- | + * known by backend | | | || | | | | 5 + * ------------------------------------------------------------------------------------------- + * 2) some more backend known + pending | N+ | P+ | P- || N+ | P+ | P- | N- | 3 + * ------------------------------------------------------------------------------------------- + * 3) advance time with idle timeout | | | || | | | | + * setTotalExpectedExecs with | N+ | P+ | D || N+ | P+ | D | D | 0 + * default->1, rp->1 | | | || | | | | + * ------------------------------------------------------------------------------------------- + * 4) setTotalExpectedExecs with | N+ | P+ | D || N+ | P+ | D | D | 0 and + * default->2, rp->2 | | | || | | | | no new POD req. + * =========================================================================================== + * + * 5) setTotalExpectedExecs with default -> 3, rp -> 3 which will lead to creation of the new + * PODs: 8 and 9 + */ + test("SPARK-34361: scheduler backend known pods with multiple resource profiles at downscaling") { + when(podOperations + .withField("status.phase", "Pending")) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(podOperations) + when(podOperations + .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any())) + .thenReturn(podOperations) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + val rpb = new ResourceProfileBuilder() + val ereq = new ExecutorResourceRequests() + val treq = new TaskResourceRequests() + ereq.cores(4).memory("2g") + treq.cpus(2) + rpb.require(ereq).require(treq) + val rp = rpb.build() + + // 0) request 3 PODs for the default and 4 PODs for the other resource profile + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 4)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 7) + verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(3, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(4, rp.id)) + verify(podOperations).create(podWithAttachedContainerForId(5, rp.id)) + verify(podOperations).create(podWithAttachedContainerForId(6, rp.id)) + verify(podOperations).create(podWithAttachedContainerForId(7, rp.id)) + + // 1) make 1 POD known by the scheduler backend for each resource profile + when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "4")) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5, + "scheduler backend known PODs are not outstanding") + verify(podOperations, times(7)).create(any()) + + // 2) make 1 extra POD known by the scheduler backend for each resource profile + // and make some to pending + when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "2", "4", "5")) + snapshotsStore.updatePod(pendingExecutor(2, defaultProfile.id)) + snapshotsStore.updatePod(pendingExecutor(3, defaultProfile.id)) + snapshotsStore.updatePod(pendingExecutor(5, rp.id)) + snapshotsStore.updatePod(pendingExecutor(6, rp.id)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) + verify(podOperations, times(7)).create(any()) + + // 3) downscale to 1 POD for default and 1 POD for the other resource profile + waitForExecutorPodsClock.advance(executorIdleTimeout * 2) + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podOperations, times(7)).create(any()) + verify(podOperations, times(2)).delete() + assert(podsAllocatorUnderTest.isDeleted("3")) + assert(podsAllocatorUnderTest.isDeleted("6")) + assert(podsAllocatorUnderTest.isDeleted("7")) + + // 4) upscale to 2 PODs for default and 2 for the other resource profile but as there is still + // 2 PODs known by the scheduler backend there must be no new POD requested to be created + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 2)) + snapshotsStore.notifySubscribers() + verify(podOperations, times(7)).create(any()) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + verify(podOperations, times(7)).create(any()) + + // 5) requesting 1 more executor for each resource + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 3)) + snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) + verify(podOperations, times(9)).create(any()) + verify(podOperations).create(podWithAttachedContainerForId(8, defaultProfile.id)) + verify(podOperations).create(podWithAttachedContainerForId(9, rp.id)) + } + test("SPARK-33288: multiple resource profiles") { when(podOperations .withField("status.phase", "Pending")) @@ -330,6 +473,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Target 1 executor for default profile, 2 for other profile, // make sure it's requested, even with an empty initial snapshot. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id)) verify(podOperations).create(podWithAttachedContainerForId(2, rp.id)) verify(podOperations).create(podWithAttachedContainerForId(3, rp.id)) @@ -339,6 +483,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(runningExecutor(2, rp.id)) snapshotsStore.updatePod(runningExecutor(3, rp.id)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(3)).create(any()) verify(podOperations, never()).delete() @@ -346,6 +491,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // make sure all are requested. podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4) verify(podOperations).create(podWithAttachedContainerForId(4, defaultProfile.id)) verify(podOperations).create(podWithAttachedContainerForId(5, defaultProfile.id)) verify(podOperations).create(podWithAttachedContainerForId(6, defaultProfile.id)) @@ -356,6 +502,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(pendingExecutor(5, defaultProfile.id)) snapshotsStore.updatePod(pendingExecutor(7, rp.id)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) verify(podOperations, times(7)).create(any()) verify(podOperations, never()).delete() @@ -364,6 +511,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.advance(executorIdleTimeout * 2) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1)) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(7)).create(any()) verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6") verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7") @@ -379,6 +527,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { snapshotsStore.updatePod(deletedExecutor(7)) snapshotsStore.removeDeletedExecutors() snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) assert(!podsAllocatorUnderTest.isDeleted("5")) assert(!podsAllocatorUnderTest.isDeleted("6")) assert(!podsAllocatorUnderTest.isDeleted("7")) @@ -399,6 +548,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { .thenReturn(podOperations) podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6)) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) // Initial request of pods verify(podOperations).create(podWithAttachedContainerForId(1)) verify(podOperations).create(podWithAttachedContainerForId(2)) @@ -414,6 +564,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // We move forward one allocation cycle waitForExecutorPodsClock.setTime(podAllocationDelay + 1) snapshotsStore.notifySubscribers() + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2) // We request pod 6 verify(podOperations).create(podWithAttachedContainerForId(6)) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 861d41c..e4a73e2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -127,7 +127,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn test("Start all components") { schedulerBackendUnderTest.start() verify(podAllocator).setTotalExpectedExecutors(Map(defaultProfile -> 3)) - verify(podAllocator).start(TEST_SPARK_APP_ID) + verify(podAllocator).start(TEST_SPARK_APP_ID, schedulerBackendUnderTest) verify(lifecycleEventHandler).start(schedulerBackendUnderTest) verify(watchEvents).start(TEST_SPARK_APP_ID) verify(pollEvents).start(TEST_SPARK_APP_ID) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org