This is an automated email from the ASF dual-hosted git repository.
sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new d4bc277690f2 [SPARK-54173][K8S] Add support for Deployment API on K8s
d4bc277690f2 is described below
commit d4bc277690f2391e860a955bf28275c02eb33404
Author: Victor Sunderland <[email protected]>
AuthorDate: Mon Nov 10 20:19:49 2025 -0800
[SPARK-54173][K8S] Add support for Deployment API on K8s
### What changes were proposed in this pull request?
Adds support for K8s `Deployment` API to allocate pods.
### Why are the changes needed?
Allocating individual pods is not ideal, and we can allocate with higher
level APIs. #33508 helps this by adding an interface for arbitrary allocators
and adds a statefulset allocator. However, dynamic allocation only works if you
have implemented a PodDisruptionBudget associated with the decommission label.
Since Deployment uses ReplicaSet, which supports `pod-deletion-cost`
annotation, we can avoid needing to create a separate PDB resource, and allow
dynamic allocation (w/ shuffle t [...]
### Does this PR introduce _any_ user-facing change?
Yes, adds user-facing configs
```
spark.kubernetes.executor.podDeletionCost
```
### How was this patch tested?
New unit tests + passing existing unit tests + tested in a cluster with
shuffle tracking and dynamic allocation enabled
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52867 from ForVic/dev/victors/deployment_allocator.
Lead-authored-by: Victor Sunderland <[email protected]>
Co-authored-by: victors-oai <[email protected]>
Co-authored-by: Victor Sunderland <[email protected]>
Signed-off-by: Chao Sun <[email protected]>
---
docs/running-on-kubernetes.md | 18 +-
.../scala/org/apache/spark/deploy/k8s/Config.scala | 17 +-
.../k8s/features/BasicExecutorFeatureStep.scala | 11 +-
.../cluster/k8s/DeploymentPodsAllocator.scala | 200 +++++++++++++++++++++
.../cluster/k8s/KubernetesClusterManager.scala | 14 +-
.../k8s/KubernetesClusterSchedulerBackend.scala | 35 ++++
.../apache/spark/deploy/k8s/Fabric8Aliases.scala | 7 +-
.../features/BasicExecutorFeatureStepSuite.scala | 21 +++
.../cluster/k8s/DeploymentAllocatorSuite.scala | 190 ++++++++++++++++++++
.../k8s/KubernetesClusterManagerSuite.scala | 51 +++++-
.../KubernetesClusterSchedulerBackendSuite.scala | 75 ++++++--
11 files changed, 605 insertions(+), 34 deletions(-)
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index bdc2a1a156b0..ababe38e4e25 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1474,6 +1474,16 @@ See the [configuration page](configuration.html) for
information on Spark config
</td>
<td>3.3.0</td>
</tr>
+<tr>
+ <td><code>spark.kubernetes.executor.podDeletionCost</code></td>
+ <td>(none)</td>
+ <td>
+ Value to apply to the
<code>controller.kubernetes.io/pod-deletion-cost</code> annotation
+ when Spark tells a deployment-based allocator to remove executor pods. Set
this to steer
+ Kubernetes to remove the same pods that Spark selected when the deployment
scales down.
+ </td>
+ <td>4.2.0</td>
+</tr>
<tr>
<td><code>spark.kubernetes.executor.scheduler.name</code></td>
<td>(none)</td>
@@ -1654,10 +1664,10 @@ See the [configuration page](configuration.html) for
information on Spark config
<td><code>spark.kubernetes.allocation.pods.allocator</code></td>
<td><code>direct</code></td>
<td>
- Allocator to use for pods. Possible values are <code>direct</code> (the
default)
- and <code>statefulset</code>, or a full class name of a class implementing
`AbstractPodsAllocator`.
- Future version may add Job or replicaset. This is a developer API and may
change
- or be removed at anytime.
+ Allocator to use for pods. Possible values are <code>direct</code> (the
default),
+ <code>statefulset</code>, <code>deployment</code>, or a full class name of
a class
+ implementing `AbstractPodsAllocator`. Future version may add Job or
replicaset.
+ This is a developer API and may change or be removed at anytime.
</td>
<td>3.3.0</td>
</tr>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index fafff5046b9d..1fb214c87c53 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_ENABLED}
private[spark] object Config extends Logging {
@@ -462,14 +462,25 @@ private[spark] object Config extends Logging {
val KUBERNETES_ALLOCATION_PODS_ALLOCATOR =
ConfigBuilder("spark.kubernetes.allocation.pods.allocator")
- .doc("Allocator to use for pods. Possible values are direct (the
default) and statefulset " +
- ", or a full class name of a class implementing AbstractPodsAllocator.
" +
+ .doc("Allocator to use for pods. Possible values are direct (the
default), statefulset," +
+ " deployment, or a full class name of a class implementing
AbstractPodsAllocator. " +
"Future version may add Job or replicaset. This is a developer API and
may change " +
"or be removed at anytime.")
.version("3.3.0")
.stringConf
.createWithDefault("direct")
+ val KUBERNETES_EXECUTOR_POD_DELETION_COST =
+ ConfigBuilder("spark.kubernetes.executor.podDeletionCost")
+ .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" +
+ " annotation when Spark asks a deployment-based allocator to remove
executor pods. This " +
+ "helps Kubernetes pick the same pods Spark selected when the
deployment scales down." +
+ s" This should only be enabled when both
$KUBERNETES_ALLOCATION_PODS_ALLOCATOR is set to " +
+ s"deployment, and $DYN_ALLOCATION_ENABLED is enabled.")
+ .version("4.2.0")
+ .intConf
+ .createOptional
+
val KUBERNETES_ALLOCATION_BATCH_SIZE =
ConfigBuilder("spark.kubernetes.allocation.batch.size")
.doc("Number of pods to launch at once in each round of executor
allocation.")
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 13d1f1bc98a0..5f61c014127a 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.features
+import java.util.Locale
+
import scala.jdk.CollectionConverters._
import io.fabric8.kubernetes.api.model._
@@ -115,12 +117,17 @@ private[spark] class BasicExecutorFeatureStep(
// hostname must be no longer than
`KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters,
// so take the last 63 characters of the pod name as the hostname.
// This preserves uniqueness since the end of name contains executorId
- val hostname = name.substring(Math.max(0, name.length -
KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
+ var hostname = name.substring(Math.max(0, name.length -
KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
// Remove non-word characters from the start of the hostname
.replaceAll("^[^\\w]+", "")
// Replace dangerous characters in the remaining string with a safe
alternative.
.replaceAll("[^\\w-]+", "_")
+ // Deployment resource does not support capital characters in the hostname
+ if
(kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment"))
{
+ hostname = hostname.toLowerCase(Locale.ROOT)
+ }
+
val executorMemoryQuantity = new
Quantity(s"${execResources.totalMemMiB}Mi")
val executorCpuQuantity = new Quantity(executorCoresRequest)
val executorResourceQuantities =
@@ -270,7 +277,7 @@ private[spark] class BasicExecutorFeatureStep(
}
val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
match {
- case "statefulset" => "Always"
+ case "statefulset" | "deployment" => "Always"
case _ => "Never"
}
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
new file mode 100644
index 000000000000..213d12301a9c
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder,
PodTemplateSpec}
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.util.{Clock, Utils}
+
+/**
+ * A pods allocator backed by Kubernetes Deployments.
+ *
+ * The Deployment controller honours the
`controller.kubernetes.io/pod-deletion-cost`
+ * annotation, so executors selected by Spark for removal can be prioritised
when the
+ * deployment scales down. This provides predictable downscale behaviour for
dynamic
+ * allocation that is not possible with StatefulSets which only remove pods in
ordinal order.
+ */
+class DeploymentPodsAllocator(
+ conf: SparkConf,
+ secMgr: SecurityManager,
+ executorBuilder: KubernetesExecutorBuilder,
+ kubernetesClient: KubernetesClient,
+ snapshotsStore: ExecutorPodsSnapshotsStore,
+ clock: Clock) extends AbstractPodsAllocator() with Logging {
+
+ private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
+
+ private val driverPodReadinessTimeout =
conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
+
+ private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
+ private val kubernetesDriverPodName = conf.get(KUBERNETES_DRIVER_POD_NAME)
+
+ val driverPod: Option[Pod] = kubernetesDriverPodName
+ .map(name => Option(kubernetesClient.pods()
+ .inNamespace(namespace)
+ .withName(name)
+ .get())
+ .getOrElse(throw new SparkException(
+ s"No pod was found named $name in the cluster in the " +
+ s"namespace $namespace (this was supposed to be the driver pod.).")))
+
+ private var appId: String = _
+
+ private val deploymentsCreated = new mutable.HashSet[Int]()
+
+ private val podDeletionCostAnnotation =
"controller.kubernetes.io/pod-deletion-cost"
+
+ override def start(
+ applicationId: String,
+ schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+ appId = applicationId
+ driverPod.foreach { pod =>
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withName(pod.getMetadata.getName)
+ .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+ }
+ }
+ }
+
+ override def setTotalExpectedExecutors(
+ resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
+ if (appId == null) {
+ throw new SparkException("setTotalExpectedExecutors called before start
of allocator.")
+ }
+ resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
+ rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
+ setTargetExecutorsDeployment(numExecs, appId, rp.id)
+ }
+ }
+
+ override def isDeleted(executorId: String): Boolean = false
+
+ private def setName(applicationId: String, resourceProfileId: Int): String =
{
+ s"spark-d-$applicationId-$resourceProfileId"
+ }
+
+ private def setTargetExecutorsDeployment(
+ expected: Int,
+ applicationId: String,
+ resourceProfileId: Int): Unit = {
+ if (deploymentsCreated.contains(resourceProfileId)) {
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(setName(applicationId, resourceProfileId))
+ .scale(expected)
+ } else {
+ val executorConf = KubernetesConf.createExecutorConf(
+ conf,
+ "EXECID",
+ applicationId,
+ driverPod,
+ resourceProfileId)
+ val resolvedExecutorSpec = executorBuilder.buildFromFeatures(
+ executorConf,
+ secMgr,
+ kubernetesClient,
+ rpIdToResourceProfile(resourceProfileId))
+ val executorPod = resolvedExecutorSpec.pod
+
+ val podSpecBuilder = executorPod.pod.getSpec match {
+ case null => new PodSpecBuilder()
+ case s => new PodSpecBuilder(s)
+ }
+ val podWithAttachedContainer: PodSpec = podSpecBuilder
+ .addToContainers(executorPod.container)
+ .build()
+
+ val meta = executorPod.pod.getMetadata
+ val resources = resolvedExecutorSpec.executorKubernetesResources
+ val failureMessage =
+ "PersistentVolumeClaims are not supported with the deployment
allocator. " +
+ "Please remove PVC requirements or choose a different pods
allocator."
+ val dynamicVolumeClaims = resources.filter(_.getKind ==
"PersistentVolumeClaim")
+ if (dynamicVolumeClaims.nonEmpty) {
+ throw new SparkException(failureMessage)
+ }
+ val staticVolumeClaims = Option(podWithAttachedContainer.getVolumes)
+ .map(_.asScala.filter(_.getPersistentVolumeClaim != null))
+ .getOrElse(Seq.empty)
+ if (staticVolumeClaims.nonEmpty) {
+ throw new SparkException(failureMessage)
+ }
+
+ val currentAnnotations = Option(meta.getAnnotations)
+ .map(_.asScala).getOrElse(Map.empty[String, String])
+ if (!currentAnnotations.contains(podDeletionCostAnnotation)) {
+ val newAnnotations =
currentAnnotations.concat(Seq(podDeletionCostAnnotation -> "0"))
+ meta.setAnnotations(newAnnotations.asJava)
+ }
+
+ val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer)
+
+ val deployment = new DeploymentBuilder()
+ .withNewMetadata()
+ .withName(setName(applicationId, resourceProfileId))
+ .withNamespace(namespace)
+ .endMetadata()
+ .withNewSpec()
+ .withReplicas(expected)
+ .withNewSelector()
+ .addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
+ .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL,
resourceProfileId.toString)
+ .endSelector()
+ .withTemplate(podTemplateSpec)
+ .endSpec()
+ .build()
+
+ addOwnerReference(driverPod.get, Seq(deployment))
+
kubernetesClient.apps().deployments().inNamespace(namespace).resource(deployment).create()
+ deploymentsCreated += resourceProfileId
+ }
+ }
+
+ override def stop(applicationId: String): Unit = {
+ deploymentsCreated.foreach { rpid =>
+ Utils.tryLogNonFatalError {
+ kubernetesClient
+ .apps()
+ .deployments()
+ .inNamespace(namespace)
+ .withName(setName(applicationId, rpid))
+ .delete()
+ }
+ }
+ }
+}
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 3fb1ed0c9c0f..15fcfe001a58 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
import io.fabric8.kubernetes.client.Config
import io.fabric8.kubernetes.client.KubernetesClient
-import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex}
+import org.apache.spark.{SparkConf, SparkContext, SparkException,
SparkMasterRegex}
import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils,
SparkKubernetesClientFactory}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
@@ -160,9 +160,19 @@ private[spark] class KubernetesClusterManager extends
ExternalClusterManager wit
private[k8s] def makeExecutorPodsAllocator(sc: SparkContext,
kubernetesClient: KubernetesClient,
snapshotsStore: ExecutorPodsSnapshotsStore) = {
- val executorPodsAllocatorName =
sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
+ val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
+ if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf)
&&
+ sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) {
+ throw new SparkException(
+ s"Dynamic allocation with the deployment pods allocator requires " +
+ s"'${KUBERNETES_EXECUTOR_POD_DELETION_COST.key}' to be configured.")
+ }
+
+ val executorPodsAllocatorName = allocator match {
case "statefulset" =>
classOf[StatefulSetPodsAllocator].getName
+ case "deployment" =>
+ classOf[DeploymentPodsAllocator].getName
case "direct" =>
classOf[ExecutorPodsAllocator].getName
case fullClass =>
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index aacd8b84199e..a6d9d23fb8df 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -73,6 +73,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val namespace = conf.get(KUBERNETES_NAMESPACE)
+ // KEP 2255: When a Deployment or Replicaset is scaled down, the pods will
be deleted in the
+ // order of the value of this annotation, ascending.
+ private val podDeletionCostAnnotation =
"controller.kubernetes.io/pod-deletion-cost"
+
// Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
private[k8s] def doRemoveExecutor(executorId: String, reason:
ExecutorLossReason): Unit = {
removeExecutor(executorId, reason)
@@ -195,6 +199,31 @@ private[spark] class KubernetesClusterSchedulerBackend(
super.getExecutorIds()
}
+ private def annotateExecutorDeletionCost(execIds: Seq[String]): Unit = {
+ conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).foreach { cost =>
+ logInfo(s"Annotating executor pod(s) ${execIds.mkString(",")} with
deletion cost $cost")
+ val annotateTask = new Runnable() {
+ override def run(): Unit = Utils.tryLogNonFatalError {
+ kubernetesClient
+ .pods()
+ .inNamespace(namespace)
+ .withLabel(SPARK_APP_ID_LABEL, applicationId())
+ .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+ .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
+ .resources()
+ .forEach { podResource =>
+ podResource.edit({ p: Pod =>
+ new PodBuilder(p).editOrNewMetadata()
+ .addToAnnotations(podDeletionCostAnnotation, cost.toString)
+ .endMetadata()
+ .build()})
+ }
+ }
+ }
+ executorService.execute(annotateTask)
+ }
+ }
+
private def labelDecommissioningExecs(execIds: Seq[String]) = {
// Only kick off the labeling task if we have a label.
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
@@ -228,6 +257,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
// picked the pod to evict so we don't need to update the labels.
if (!triggeredByExecutor) {
labelDecommissioningExecs(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
+ if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment"))
{
+
annotateExecutorDeletionCost(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
+ }
}
super.decommissionExecutors(executorsAndDecomInfo,
adjustTargetNumExecutors,
triggeredByExecutor)
@@ -235,6 +267,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
// If we've decided to remove some executors we should tell Kubernetes
that we don't care.
+ if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
+ annotateExecutorDeletionCost(executorIds)
+ }
labelDecommissioningExecs(executorIds)
// Tell the executors to exit themselves.
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
index 1a4bc9781da2..86deb3e52bae 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.k8s
import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata,
PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
-import io.fabric8.kubernetes.api.model.apps.StatefulSet
-import io.fabric8.kubernetes.api.model.apps.StatefulSetList
+import io.fabric8.kubernetes.api.model.apps.{Deployment, DeploymentList,
StatefulSet, StatefulSetList}
import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable,
MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable,
NonNamespaceOperation, PodResource, Resource, RollableScalableResource}
object Fabric8Aliases {
@@ -38,6 +37,10 @@ object Fabric8Aliases {
type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList,
STATEFUL_SET_RES]
type STATEFUL_SETS_NAMESPACED =
NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
+ type DEPLOYMENT_RES = RollableScalableResource[Deployment]
+ type DEPLOYMENTS = MixedOperation[Deployment, DeploymentList, DEPLOYMENT_RES]
+ type DEPLOYMENTS_NAMESPACED =
+ NonNamespaceOperation[Deployment, DeploymentList, DEPLOYMENT_RES]
type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim,
PersistentVolumeClaimList,
Resource[PersistentVolumeClaim]]
type PVC_WITH_NAMESPACE = NonNamespaceOperation[PersistentVolumeClaim,
PersistentVolumeClaimList,
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index ced1326e7938..b8b5da192a09 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.features
+import java.util.Locale
+
import scala.jdk.CollectionConverters._
import com.google.common.net.InternetDomainName
@@ -26,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf,
SparkException, SparkFunSui
import org.apache.spark.deploy.k8s.{KubernetesExecutorConf,
KubernetesTestConf, SecretVolumeUtils, SparkPod}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
import
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
import org.apache.spark.internal.config
import org.apache.spark.internal.config._
@@ -266,6 +269,24 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite
with BeforeAndAfter {
}
}
+ test("deployment allocator uses restartPolicy Always and lowercase
hostnames") {
+ baseConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+ initDefaultProfile(baseConf)
+ val executorConf = KubernetesConf.createExecutorConf(
+ sparkConf = baseConf,
+ executorId = "EXECID",
+ appId = KubernetesTestConf.APP_ID,
+ driverPod = Some(DRIVER_POD))
+ val step = new BasicExecutorFeatureStep(executorConf, new
SecurityManager(baseConf),
+ defaultProfile)
+ val executor = step.configurePod(SparkPod.initialPod())
+
+ val hostname = executor.pod.getSpec.getHostname
+ assert(hostname === hostname.toLowerCase(Locale.ROOT))
+ assert(InternetDomainName.isValid(hostname))
+ assert(executor.pod.getSpec.getRestartPolicy === "Always")
+ }
+
test("classpath and extra java options get translated into environment
variables") {
baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
new file mode 100644
index 000000000000..2166cef9d73c
--- /dev/null
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.Deployment
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{AppsAPIGroupDSL, PodResource}
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito.{never, times, verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException,
SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf,
KubernetesExecutorSpec}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.resource.{ResourceProfile, ResourceProfileBuilder}
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class DeploymentAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
+
+ private val driverPodName = "driver"
+
+ private val driverPod = new PodBuilder()
+ .withNewMetadata()
+ .withName(driverPodName)
+ .withUid("driver-pod-uid")
+ .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+ .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+ .endMetadata()
+ .build()
+
+ private val conf = new SparkConf()
+ .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+ .set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+
+ private val secMgr = new SecurityManager(conf)
+ private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(conf)
+
+ private val schedulerBackendAppId = "testapp"
+
+ @Mock private var kubernetesClient: KubernetesClient = _
+ @Mock private var appsClient: AppsAPIGroupDSL = _
+ @Mock private var deployments: DEPLOYMENTS = _
+ @Mock private var deploymentsNamespaced: DEPLOYMENTS_NAMESPACED = _
+ @Mock private var deploymentResource: DEPLOYMENT_RES = _
+ @Mock private var pods: PODS = _
+ @Mock private var podsNamespaced: PODS_WITH_NAMESPACE = _
+ @Mock private var driverPodResource: PodResource = _
+ @Mock private var executorPodResource: PodResource = _
+ @Mock private var executorBuilder: KubernetesExecutorBuilder = _
+ @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
+ private var allocator: DeploymentPodsAllocator = _
+ private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+
+ before {
+ MockitoAnnotations.openMocks(this).close()
+ when(kubernetesClient.apps()).thenReturn(appsClient)
+ when(appsClient.deployments()).thenReturn(deployments)
+ when(deployments.inNamespace("default")).thenReturn(deploymentsNamespaced)
+
when(deploymentsNamespaced.resource(any(classOf[Deployment]))).thenReturn(deploymentResource)
+
when(deploymentsNamespaced.withName(any[String]())).thenReturn(deploymentResource)
+
+ when(kubernetesClient.pods()).thenReturn(pods)
+ when(pods.inNamespace("default")).thenReturn(podsNamespaced)
+ when(podsNamespaced.withName(driverPodName)).thenReturn(driverPodResource)
+
when(podsNamespaced.resource(any(classOf[Pod]))).thenReturn(executorPodResource)
+ when(driverPodResource.get).thenReturn(driverPod)
+ when(driverPodResource.waitUntilReady(any(), any())).thenReturn(driverPod)
+ when(executorBuilder.buildFromFeatures(
+ any(classOf[KubernetesExecutorConf]),
+ meq(secMgr),
+ meq(kubernetesClient),
+ any(classOf[ResourceProfile])))
+ .thenAnswer { invocation =>
+ val k8sConf = invocation.getArgument[KubernetesExecutorConf](0)
+ KubernetesExecutorSpec(
+ executorPodWithId(0, k8sConf.resourceProfileId),
+ Seq.empty)
+ }
+
+ snapshotsStore = new DeterministicExecutorPodsSnapshotsStore
+ allocator = new DeploymentPodsAllocator(
+ conf,
+ secMgr,
+ executorBuilder,
+ kubernetesClient,
+ snapshotsStore,
+ snapshotsStore.clock)
+
+ when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
+ allocator.start(TEST_SPARK_APP_ID, schedulerBackend)
+ }
+
+ after {
+ ResourceProfile.clearDefaultProfile()
+ }
+
+ test("creates deployments per resource profile and seeds deletion cost
annotation") {
+ val rpBuilder = new ResourceProfileBuilder()
+ val secondProfile = rpBuilder.build()
+
+ allocator.setTotalExpectedExecutors(
+ Map(defaultProfile -> 3, secondProfile -> 2))
+
+ val captor = ArgumentCaptor.forClass(classOf[Deployment])
+ verify(deploymentsNamespaced, times(2)).resource(captor.capture())
+ verify(deploymentResource, times(2)).create()
+
+ val createdDeployments = captor.getAllValues.asScala
+ createdDeployments.foreach { deployment =>
+ assert(deployment.getMetadata.getNamespace === "default")
+ assert(deployment.getSpec.getTemplate.getMetadata
+ .getAnnotations.get("controller.kubernetes.io/pod-deletion-cost") ===
"0")
+
assert(deployment.getSpec.getTemplate.getSpec.getContainers.asScala.exists(
+ _.getName == "spark-executor"))
+ val selectorLabels =
deployment.getSpec.getSelector.getMatchLabels.asScala
+ assert(selectorLabels(SPARK_APP_ID_LABEL) === TEST_SPARK_APP_ID)
+ assert(selectorLabels(SPARK_ROLE_LABEL) === SPARK_POD_EXECUTOR_ROLE)
+ }
+ }
+
+ test("scales existing deployment when replicas change") {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 5))
+ verify(deploymentResource, times(1)).create()
+
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 7))
+ verify(deploymentResource).scale(7)
+ }
+
+ test("throws when executor template contributes dynamic PVCs") {
+ val pvc = persistentVolumeClaim("spark-pvc", "standard", "1Gi")
+ when(executorBuilder.buildFromFeatures(
+ any(classOf[KubernetesExecutorConf]),
+ meq(secMgr),
+ meq(kubernetesClient),
+ any(classOf[ResourceProfile])))
+ .thenReturn(KubernetesExecutorSpec(
+ executorPodWithId(0),
+ Seq(pvc)))
+
+ val error = intercept[SparkException] {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+ }
+ assert(error.getMessage.contains("PersistentVolumeClaims are not
supported"))
+ verify(deploymentResource, never()).create()
+ }
+
+ test("throws when executor template includes static PVC references") {
+ when(executorBuilder.buildFromFeatures(
+ any(classOf[KubernetesExecutorConf]),
+ meq(secMgr),
+ meq(kubernetesClient),
+ any(classOf[ResourceProfile])))
+ .thenReturn(KubernetesExecutorSpec(
+ executorPodWithIdAndVolume(0),
+ Seq.empty))
+
+ val error = intercept[SparkException] {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+ }
+ assert(error.getMessage.contains("PersistentVolumeClaims are not
supported"))
+ verify(deploymentResource, never()).create()
+ }
+
+ test("deletes deployments on stop") {
+ allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+ allocator.stop(schedulerBackendAppId)
+ verify(deploymentResource).delete()
+ }
+}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
index 07410b6a7b71..78e0942cfb82 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark._
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.TaskSchedulerImpl
+import
org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
import org.apache.spark.scheduler.local.LocalSchedulerBackend
class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
@@ -39,27 +40,33 @@ class KubernetesClusterManagerSuite extends SparkFunSuite
with BeforeAndAfter {
@Mock
private var env: SparkEnv = _
- @Mock
private var sparkConf: SparkConf = _
before {
MockitoAnnotations.openMocks(this).close()
+ sparkConf = new SparkConf(false)
+ .set("spark.app.id", TEST_SPARK_APP_ID)
+ .set("spark.master", "k8s://test")
when(sc.conf).thenReturn(sparkConf)
- when(sc.conf.get(KUBERNETES_DRIVER_POD_NAME)).thenReturn(None)
- when(sc.conf.get(EXECUTOR_INSTANCES)).thenReturn(None)
- when(sc.conf.get(MAX_EXECUTOR_FAILURES)).thenReturn(None)
-
when(sc.conf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS)).thenReturn(None)
when(sc.env).thenReturn(env)
+ when(env.securityManager).thenReturn(new SecurityManager(sparkConf))
+ resetDynamicAllocatorConfig()
+ }
+
+ after {
+ resetDynamicAllocatorConfig()
}
test("constructing a AbstractPodsAllocator works") {
- val validConfigs = List("statefulset", "direct",
+ val validConfigs = List("statefulset", "deployment", "direct",
classOf[StatefulSetPodsAllocator].getName,
+ classOf[DeploymentPodsAllocator].getName,
classOf[ExecutorPodsAllocator].getName)
validConfigs.foreach { c =>
val manager = new KubernetesClusterManager()
- when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c)
+ sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, c)
manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+ sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
}
}
@@ -80,4 +87,34 @@ class KubernetesClusterManagerSuite extends SparkFunSuite
with BeforeAndAfter {
assert(backend2.isInstanceOf[LocalSchedulerBackend])
assert(backend2.applicationId() === "user-app-id")
}
+
+ test("deployment allocator with dynamic allocation requires deletion cost") {
+ val manager = new KubernetesClusterManager()
+ sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+ sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true")
+ sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+ sparkConf.set("spark.shuffle.service.enabled", "true")
+
+ val e = intercept[SparkException] {
+ manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+ }
+ assert(e.getMessage.contains(KUBERNETES_EXECUTOR_POD_DELETION_COST.key))
+ }
+
+ test("deployment allocator with dynamic allocation and deletion cost
succeeds") {
+ val manager = new KubernetesClusterManager()
+ sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+ sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true")
+ sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 1)
+ sparkConf.set("spark.shuffle.service.enabled", "true")
+
+ manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+ }
+
+ private def resetDynamicAllocatorConfig(): Unit = {
+ sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
+ sparkConf.remove(DYN_ALLOCATION_ENABLED.key)
+ sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+ sparkConf.remove("spark.shuffle.service.enabled")
+ }
}
diff --git
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index b2e4a7182a77..76c2e16782c1 100644
---
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -18,14 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s
import java.util.Arrays
import java.util.concurrent.TimeUnit
+import java.util.function.UnaryOperator
-import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList}
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodBuilder, PodList}
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
import org.jmock.lib.concurrent.DeterministicScheduler
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
-import org.mockito.Mockito.{mock, never, spy, verify, when}
+import org.mockito.Mockito.{atLeastOnce, mock, never, spy, verify, when}
import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
@@ -205,17 +208,13 @@ class KubernetesClusterSchedulerBackendSuite extends
SparkFunSuite with BeforeAn
verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled))
verify(labeledPods, never()).delete()
- verify(pod1op, never()).edit(any(
-
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
- verify(pod2op, never()).edit(any(
-
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+ verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]]))
+ verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD)
* 2,
TimeUnit.MILLISECONDS)
verify(labeledPods, never()).delete()
- verify(pod1op, never()).edit(any(
-
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
- verify(pod2op, never()).edit(any(
-
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+ verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]]))
+ verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream)
val podList = mock(classOf[PodList])
@@ -227,16 +226,64 @@ class KubernetesClusterSchedulerBackendSuite extends
SparkFunSuite with BeforeAn
schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
verify(labeledPods, never()).delete()
schedulerExecutorService.runUntilIdle()
- verify(pod1op).edit(any(
-
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
- verify(pod2op, never()).edit(any(
-
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+ verify(pod1op).edit(any(classOf[UnaryOperator[Pod]]))
+ verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
verify(labeledPods, never()).delete()
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD)
* 2,
TimeUnit.MILLISECONDS)
verify(labeledPods).delete()
}
+ test("Annotates executor pods with deletion cost when configured") {
+ sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 7)
+ schedulerBackendUnderTest.start()
+
+ when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods)
+ when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL,
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_APP_ID_LABEL,
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+ when(labeledPods.withLabel(SPARK_ROLE_LABEL,
SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+ when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL,
"3")).thenReturn(labeledPods)
+
+ val podResource = mock(classOf[PodResource])
+ val basePod = new PodBuilder()
+ .withNewMetadata()
+ .withName("exec-3")
+ .withNamespace("default")
+ .endMetadata()
+ .build()
+
+ val editCaptor = ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]])
+ when(podResource.edit(any(classOf[UnaryOperator[Pod]]))).thenAnswer {
invocation =>
+ val fn = invocation.getArgument[UnaryOperator[Pod]](0)
+ fn.apply(basePod)
+ }
+
+ when(labeledPods.resources())
+ .thenAnswer(_ => java.util.stream.Stream.of[PodResource](podResource))
+
+ val method = classOf[KubernetesClusterSchedulerBackend]
+ .getDeclaredMethods
+ .find(_.getName == "annotateExecutorDeletionCost")
+ .get
+ method.setAccessible(true)
+ method.invoke(schedulerBackendUnderTest, Seq("3"))
+ schedulerExecutorService.runUntilIdle()
+
+ verify(podResource, atLeastOnce()).edit(editCaptor.capture())
+ val appliedPods = editCaptor.getAllValues.asScala
+ .scanLeft(basePod)((pod, fn) => fn.apply(pod))
+ .tail
+ val annotated = appliedPods
+ .find(_.getMetadata.getAnnotations.asScala
+ .contains("controller.kubernetes.io/pod-deletion-cost"))
+ assert(annotated.isDefined,
+ s"expected controller.kubernetes.io/pod-deletion-cost annotation, got
annotations " +
+ s"${appliedPods.map(_.getMetadata.getAnnotations).asJava}")
+ val annotations = annotated.get.getMetadata.getAnnotations.asScala
+ assert(annotations("controller.kubernetes.io/pod-deletion-cost") === "7")
+ sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+ }
+
test("SPARK-34407: CoarseGrainedSchedulerBackend.stop may throw
SparkException") {
schedulerBackendUnderTest.start()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]