This is an automated email from the ASF dual-hosted git repository.

sunchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d4bc277690f2 [SPARK-54173][K8S] Add support for Deployment API on K8s
d4bc277690f2 is described below

commit d4bc277690f2391e860a955bf28275c02eb33404
Author: Victor Sunderland <[email protected]>
AuthorDate: Mon Nov 10 20:19:49 2025 -0800

    [SPARK-54173][K8S] Add support for Deployment API on K8s
    
    ### What changes were proposed in this pull request?
    
    Adds support for K8s `Deployment` API to allocate pods.
    
    ### Why are the changes needed?
    
    Allocating individual pods is not ideal, and we can allocate with higher 
level APIs. #33508 helps this by adding an interface for arbitrary allocators 
and adds a statefulset allocator. However, dynamic allocation only works if you 
have implemented a PodDisruptionBudget associated with the decommission label. 
Since Deployment uses ReplicaSet, which supports `pod-deletion-cost` 
annotation, we can avoid needing to create a separate PDB resource, and allow 
dynamic allocation (w/ shuffle t [...]
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, adds user-facing configs
    ```
    spark.kubernetes.executor.podDeletionCost
    ```
    
    ### How was this patch tested?
    New unit tests + passing existing unit tests + tested in a cluster with 
shuffle tracking and dynamic allocation enabled
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #52867 from ForVic/dev/victors/deployment_allocator.
    
    Lead-authored-by: Victor Sunderland <[email protected]>
    Co-authored-by: victors-oai <[email protected]>
    Co-authored-by: Victor Sunderland <[email protected]>
    Signed-off-by: Chao Sun <[email protected]>
---
 docs/running-on-kubernetes.md                      |  18 +-
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  17 +-
 .../k8s/features/BasicExecutorFeatureStep.scala    |  11 +-
 .../cluster/k8s/DeploymentPodsAllocator.scala      | 200 +++++++++++++++++++++
 .../cluster/k8s/KubernetesClusterManager.scala     |  14 +-
 .../k8s/KubernetesClusterSchedulerBackend.scala    |  35 ++++
 .../apache/spark/deploy/k8s/Fabric8Aliases.scala   |   7 +-
 .../features/BasicExecutorFeatureStepSuite.scala   |  21 +++
 .../cluster/k8s/DeploymentAllocatorSuite.scala     | 190 ++++++++++++++++++++
 .../k8s/KubernetesClusterManagerSuite.scala        |  51 +++++-
 .../KubernetesClusterSchedulerBackendSuite.scala   |  75 ++++++--
 11 files changed, 605 insertions(+), 34 deletions(-)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index bdc2a1a156b0..ababe38e4e25 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1474,6 +1474,16 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
   <td>3.3.0</td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.executor.podDeletionCost</code></td>
+  <td>(none)</td>
+  <td>
+    Value to apply to the 
<code>controller.kubernetes.io/pod-deletion-cost</code> annotation
+    when Spark tells a deployment-based allocator to remove executor pods. Set 
this to steer
+    Kubernetes to remove the same pods that Spark selected when the deployment 
scales down.
+  </td>
+  <td>4.2.0</td>
+</tr>
 <tr>
   <td><code>spark.kubernetes.executor.scheduler.name</code></td>
   <td>(none)</td>
@@ -1654,10 +1664,10 @@ See the [configuration page](configuration.html) for 
information on Spark config
   <td><code>spark.kubernetes.allocation.pods.allocator</code></td>
   <td><code>direct</code></td>
   <td>
-    Allocator to use for pods. Possible values are <code>direct</code> (the 
default)
-    and <code>statefulset</code>, or a full class name of a class implementing 
`AbstractPodsAllocator`.
-    Future version may add Job or replicaset. This is a developer API and may 
change
-    or be removed at anytime.
+    Allocator to use for pods. Possible values are <code>direct</code> (the 
default),
+    <code>statefulset</code>, <code>deployment</code>, or a full class name of 
a class
+    implementing `AbstractPodsAllocator`. Future version may add Job or 
replicaset. 
+    This is a developer API and may change or be removed at anytime.  
   </td>
   <td>3.3.0</td>
 </tr>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index fafff5046b9d..1fb214c87c53 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.internal.config.{ConfigBuilder, DYN_ALLOCATION_ENABLED}
 
 private[spark] object Config extends Logging {
 
@@ -462,14 +462,25 @@ private[spark] object Config extends Logging {
 
   val KUBERNETES_ALLOCATION_PODS_ALLOCATOR =
     ConfigBuilder("spark.kubernetes.allocation.pods.allocator")
-      .doc("Allocator to use for pods. Possible values are direct (the 
default) and statefulset " +
-        ", or a full class name of a class implementing AbstractPodsAllocator. 
" +
+      .doc("Allocator to use for pods. Possible values are direct (the 
default), statefulset," +
+        " deployment, or a full class name of a class implementing 
AbstractPodsAllocator. " +
         "Future version may add Job or replicaset. This is a developer API and 
may change " +
       "or be removed at anytime.")
       .version("3.3.0")
       .stringConf
       .createWithDefault("direct")
 
+  val KUBERNETES_EXECUTOR_POD_DELETION_COST =
+    ConfigBuilder("spark.kubernetes.executor.podDeletionCost")
+      .doc("Value to set for the controller.kubernetes.io/pod-deletion-cost" +
+        " annotation when Spark asks a deployment-based allocator to remove 
executor pods. This " +
+        "helps Kubernetes pick the same pods Spark selected when the 
deployment scales down." +
+        s" This should only be enabled when both 
$KUBERNETES_ALLOCATION_PODS_ALLOCATOR is set to " +
+        s"deployment, and $DYN_ALLOCATION_ENABLED is enabled.")
+      .version("4.2.0")
+      .intConf
+      .createOptional
+
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")
       .doc("Number of pods to launch at once in each round of executor 
allocation.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index 13d1f1bc98a0..5f61c014127a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.deploy.k8s.features
 
+import java.util.Locale
+
 import scala.jdk.CollectionConverters._
 
 import io.fabric8.kubernetes.api.model._
@@ -115,12 +117,17 @@ private[spark] class BasicExecutorFeatureStep(
     // hostname must be no longer than 
`KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters,
     // so take the last 63 characters of the pod name as the hostname.
     // This preserves uniqueness since the end of name contains executorId
-    val hostname = name.substring(Math.max(0, name.length - 
KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
+    var hostname = name.substring(Math.max(0, name.length - 
KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
       // Remove non-word characters from the start of the hostname
       .replaceAll("^[^\\w]+", "")
       // Replace dangerous characters in the remaining string with a safe 
alternative.
       .replaceAll("[^\\w-]+", "_")
 
+    // Deployment resource does not support capital characters in the hostname
+    if 
(kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) 
{
+      hostname = hostname.toLowerCase(Locale.ROOT)
+    }
+
     val executorMemoryQuantity = new 
Quantity(s"${execResources.totalMemMiB}Mi")
     val executorCpuQuantity = new Quantity(executorCoresRequest)
     val executorResourceQuantities =
@@ -270,7 +277,7 @@ private[spark] class BasicExecutorFeatureStep(
     }
 
     val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) 
match {
-      case "statefulset" => "Always"
+      case "statefulset" | "deployment" => "Always"
       case _ => "Never"
     }
 
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
new file mode 100644
index 000000000000..213d12301a9c
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentPodsAllocator.scala
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{Pod, PodSpec, PodSpecBuilder, 
PodTemplateSpec}
+import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.util.{Clock, Utils}
+
+/**
+ * A pods allocator backed by Kubernetes Deployments.
+ *
+ * The Deployment controller honours the 
`controller.kubernetes.io/pod-deletion-cost`
+ * annotation, so executors selected by Spark for removal can be prioritised 
when the
+ * deployment scales down. This provides predictable downscale behaviour for 
dynamic
+ * allocation that is not possible with StatefulSets which only remove pods in 
ordinal order.
+ */
+class DeploymentPodsAllocator(
+    conf: SparkConf,
+    secMgr: SecurityManager,
+    executorBuilder: KubernetesExecutorBuilder,
+    kubernetesClient: KubernetesClient,
+    snapshotsStore: ExecutorPodsSnapshotsStore,
+    clock: Clock) extends AbstractPodsAllocator() with Logging {
+
+  private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
+
+  private val driverPodReadinessTimeout = 
conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
+
+  private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf.get(KUBERNETES_DRIVER_POD_NAME)
+
+  val driverPod: Option[Pod] = kubernetesDriverPodName
+    .map(name => Option(kubernetesClient.pods()
+      .inNamespace(namespace)
+      .withName(name)
+      .get())
+      .getOrElse(throw new SparkException(
+        s"No pod was found named $name in the cluster in the " +
+          s"namespace $namespace (this was supposed to be the driver pod.).")))
+
+  private var appId: String = _
+
+  private val deploymentsCreated = new mutable.HashSet[Int]()
+
+  private val podDeletionCostAnnotation = 
"controller.kubernetes.io/pod-deletion-cost"
+
+  override def start(
+      applicationId: String,
+      schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
+    appId = applicationId
+    driverPod.foreach { pod =>
+      Utils.tryLogNonFatalError {
+        kubernetesClient
+          .pods()
+          .inNamespace(namespace)
+          .withName(pod.getMetadata.getName)
+          .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+      }
+    }
+  }
+
+  override def setTotalExpectedExecutors(
+      resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = {
+    if (appId == null) {
+      throw new SparkException("setTotalExpectedExecutors called before start 
of allocator.")
+    }
+    resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
+      rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
+      setTargetExecutorsDeployment(numExecs, appId, rp.id)
+    }
+  }
+
+  override def isDeleted(executorId: String): Boolean = false
+
+  private def setName(applicationId: String, resourceProfileId: Int): String = 
{
+    s"spark-d-$applicationId-$resourceProfileId"
+  }
+
+  private def setTargetExecutorsDeployment(
+      expected: Int,
+      applicationId: String,
+      resourceProfileId: Int): Unit = {
+    if (deploymentsCreated.contains(resourceProfileId)) {
+      kubernetesClient
+        .apps()
+        .deployments()
+        .inNamespace(namespace)
+        .withName(setName(applicationId, resourceProfileId))
+        .scale(expected)
+    } else {
+      val executorConf = KubernetesConf.createExecutorConf(
+        conf,
+        "EXECID",
+        applicationId,
+        driverPod,
+        resourceProfileId)
+      val resolvedExecutorSpec = executorBuilder.buildFromFeatures(
+        executorConf,
+        secMgr,
+        kubernetesClient,
+        rpIdToResourceProfile(resourceProfileId))
+      val executorPod = resolvedExecutorSpec.pod
+
+      val podSpecBuilder = executorPod.pod.getSpec match {
+        case null => new PodSpecBuilder()
+        case s => new PodSpecBuilder(s)
+      }
+      val podWithAttachedContainer: PodSpec = podSpecBuilder
+        .addToContainers(executorPod.container)
+        .build()
+
+      val meta = executorPod.pod.getMetadata
+      val resources = resolvedExecutorSpec.executorKubernetesResources
+      val failureMessage =
+        "PersistentVolumeClaims are not supported with the deployment 
allocator. " +
+          "Please remove PVC requirements or choose a different pods 
allocator."
+      val dynamicVolumeClaims = resources.filter(_.getKind == 
"PersistentVolumeClaim")
+      if (dynamicVolumeClaims.nonEmpty) {
+        throw new SparkException(failureMessage)
+      }
+      val staticVolumeClaims = Option(podWithAttachedContainer.getVolumes)
+        .map(_.asScala.filter(_.getPersistentVolumeClaim != null))
+        .getOrElse(Seq.empty)
+      if (staticVolumeClaims.nonEmpty) {
+        throw new SparkException(failureMessage)
+      }
+
+      val currentAnnotations = Option(meta.getAnnotations)
+        .map(_.asScala).getOrElse(Map.empty[String, String])
+      if (!currentAnnotations.contains(podDeletionCostAnnotation)) {
+        val newAnnotations = 
currentAnnotations.concat(Seq(podDeletionCostAnnotation -> "0"))
+        meta.setAnnotations(newAnnotations.asJava)
+      }
+
+      val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer)
+
+      val deployment = new DeploymentBuilder()
+        .withNewMetadata()
+          .withName(setName(applicationId, resourceProfileId))
+          .withNamespace(namespace)
+        .endMetadata()
+        .withNewSpec()
+          .withReplicas(expected)
+          .withNewSelector()
+            .addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
+            .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, 
resourceProfileId.toString)
+          .endSelector()
+          .withTemplate(podTemplateSpec)
+        .endSpec()
+        .build()
+
+      addOwnerReference(driverPod.get, Seq(deployment))
+      
kubernetesClient.apps().deployments().inNamespace(namespace).resource(deployment).create()
+      deploymentsCreated += resourceProfileId
+    }
+  }
+
+  override def stop(applicationId: String): Unit = {
+    deploymentsCreated.foreach { rpid =>
+      Utils.tryLogNonFatalError {
+        kubernetesClient
+          .apps()
+          .deployments()
+          .inNamespace(namespace)
+          .withName(setName(applicationId, rpid))
+          .delete()
+      }
+    }
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 3fb1ed0c9c0f..15fcfe001a58 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -21,7 +21,7 @@ import java.io.File
 import io.fabric8.kubernetes.client.Config
 import io.fabric8.kubernetes.client.KubernetesClient
 
-import org.apache.spark.{SparkConf, SparkContext, SparkMasterRegex}
+import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkMasterRegex}
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, 
SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
@@ -160,9 +160,19 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
 
   private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, 
kubernetesClient: KubernetesClient,
       snapshotsStore: ExecutorPodsSnapshotsStore) = {
-    val executorPodsAllocatorName = 
sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
+    val allocator = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
+    if (allocator == "deployment" && Utils.isDynamicAllocationEnabled(sc.conf) 
&&
+      sc.conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).isEmpty) {
+      throw new SparkException(
+        s"Dynamic allocation with the deployment pods allocator requires " +
+          s"'${KUBERNETES_EXECUTOR_POD_DELETION_COST.key}' to be configured.")
+    }
+
+    val executorPodsAllocatorName = allocator match {
       case "statefulset" =>
         classOf[StatefulSetPodsAllocator].getName
+      case "deployment" =>
+        classOf[DeploymentPodsAllocator].getName
       case "direct" =>
         classOf[ExecutorPodsAllocator].getName
       case fullClass =>
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index aacd8b84199e..a6d9d23fb8df 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -73,6 +73,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   private val namespace = conf.get(KUBERNETES_NAMESPACE)
 
+  // KEP 2255: When a Deployment or Replicaset is scaled down, the pods will 
be deleted in the
+  // order of the value of this annotation, ascending.
+  private val podDeletionCostAnnotation = 
"controller.kubernetes.io/pod-deletion-cost"
+
   // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
   private[k8s] def doRemoveExecutor(executorId: String, reason: 
ExecutorLossReason): Unit = {
     removeExecutor(executorId, reason)
@@ -195,6 +199,31 @@ private[spark] class KubernetesClusterSchedulerBackend(
     super.getExecutorIds()
   }
 
+  private def annotateExecutorDeletionCost(execIds: Seq[String]): Unit = {
+    conf.get(KUBERNETES_EXECUTOR_POD_DELETION_COST).foreach { cost =>
+      logInfo(s"Annotating executor pod(s) ${execIds.mkString(",")} with 
deletion cost $cost")
+      val annotateTask = new Runnable() {
+        override def run(): Unit = Utils.tryLogNonFatalError {
+          kubernetesClient
+            .pods()
+            .inNamespace(namespace)
+            .withLabel(SPARK_APP_ID_LABEL, applicationId())
+            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
+            .resources()
+            .forEach { podResource =>
+              podResource.edit({ p: Pod =>
+                new PodBuilder(p).editOrNewMetadata()
+                  .addToAnnotations(podDeletionCostAnnotation, cost.toString)
+                  .endMetadata()
+                  .build()})
+            }
+        }
+      }
+      executorService.execute(annotateTask)
+    }
+  }
+
   private def labelDecommissioningExecs(execIds: Seq[String]) = {
     // Only kick off the labeling task if we have a label.
     conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
@@ -228,6 +257,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
     // picked the pod to evict so we don't need to update the labels.
     if (!triggeredByExecutor) {
       
labelDecommissioningExecs(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
+      if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) 
{
+        
annotateExecutorDeletionCost(executorsAndDecomInfo.map(_._1).toImmutableArraySeq)
+      }
     }
     super.decommissionExecutors(executorsAndDecomInfo, 
adjustTargetNumExecutors,
       triggeredByExecutor)
@@ -235,6 +267,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
     // If we've decided to remove some executors we should tell Kubernetes 
that we don't care.
+    if (conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR).equals("deployment")) {
+      annotateExecutorDeletionCost(executorIds)
+    }
     labelDecommissioningExecs(executorIds)
 
     // Tell the executors to exit themselves.
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
index 1a4bc9781da2..86deb3e52bae 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
@@ -17,8 +17,7 @@
 package org.apache.spark.deploy.k8s
 
 import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, 
PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
-import io.fabric8.kubernetes.api.model.apps.StatefulSet
-import io.fabric8.kubernetes.api.model.apps.StatefulSetList
+import io.fabric8.kubernetes.api.model.apps.{Deployment, DeploymentList, 
StatefulSet, StatefulSetList}
 import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, 
MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, 
NonNamespaceOperation, PodResource, Resource, RollableScalableResource}
 
 object Fabric8Aliases {
@@ -38,6 +37,10 @@ object Fabric8Aliases {
   type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, 
STATEFUL_SET_RES]
   type STATEFUL_SETS_NAMESPACED =
     NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
+  type DEPLOYMENT_RES = RollableScalableResource[Deployment]
+  type DEPLOYMENTS = MixedOperation[Deployment, DeploymentList, DEPLOYMENT_RES]
+  type DEPLOYMENTS_NAMESPACED =
+    NonNamespaceOperation[Deployment, DeploymentList, DEPLOYMENT_RES]
   type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, 
PersistentVolumeClaimList,
     Resource[PersistentVolumeClaim]]
   type PVC_WITH_NAMESPACE = NonNamespaceOperation[PersistentVolumeClaim, 
PersistentVolumeClaimList,
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index ced1326e7938..b8b5da192a09 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.deploy.k8s.features
 
+import java.util.Locale
+
 import scala.jdk.CollectionConverters._
 
 import com.google.common.net.InternetDomainName
@@ -26,6 +28,7 @@ import org.apache.spark.{SecurityManager, SparkConf, 
SparkException, SparkFunSui
 import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, 
KubernetesTestConf, SecretVolumeUtils, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
 import 
org.apache.spark.deploy.k8s.features.KubernetesFeaturesTestUtils.TestResourceInformation
 import org.apache.spark.internal.config
 import org.apache.spark.internal.config._
@@ -266,6 +269,24 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
     }
   }
 
+  test("deployment allocator uses restartPolicy Always and lowercase 
hostnames") {
+    baseConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+    initDefaultProfile(baseConf)
+    val executorConf = KubernetesConf.createExecutorConf(
+      sparkConf = baseConf,
+      executorId = "EXECID",
+      appId = KubernetesTestConf.APP_ID,
+      driverPod = Some(DRIVER_POD))
+    val step = new BasicExecutorFeatureStep(executorConf, new 
SecurityManager(baseConf),
+      defaultProfile)
+    val executor = step.configurePod(SparkPod.initialPod())
+
+    val hostname = executor.pod.getSpec.getHostname
+    assert(hostname === hostname.toLowerCase(Locale.ROOT))
+    assert(InternetDomainName.isValid(hostname))
+    assert(executor.pod.getSpec.getRestartPolicy === "Always")
+  }
+
   test("classpath and extra java options get translated into environment 
variables") {
     baseConf.set(config.EXECUTOR_JAVA_OPTIONS, "foo=bar")
     baseConf.set(config.EXECUTOR_CLASS_PATH, "bar=baz")
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
new file mode 100644
index 000000000000..2166cef9d73c
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/DeploymentAllocatorSuite.scala
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.Deployment
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{AppsAPIGroupDSL, PodResource}
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito.{never, times, verify, when}
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException, 
SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, 
KubernetesExecutorSpec}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.resource.{ResourceProfile, ResourceProfileBuilder}
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class DeploymentAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val driverPodName = "driver"
+
+  private val driverPod = new PodBuilder()
+    .withNewMetadata()
+      .withName(driverPodName)
+      .withUid("driver-pod-uid")
+      .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+      .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+      .endMetadata()
+    .build()
+
+  private val conf = new SparkConf()
+    .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+    .set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+
+  private val secMgr = new SecurityManager(conf)
+  private val defaultProfile = ResourceProfile.getOrCreateDefaultProfile(conf)
+
+  private val schedulerBackendAppId = "testapp"
+
+  @Mock private var kubernetesClient: KubernetesClient = _
+  @Mock private var appsClient: AppsAPIGroupDSL = _
+  @Mock private var deployments: DEPLOYMENTS = _
+  @Mock private var deploymentsNamespaced: DEPLOYMENTS_NAMESPACED = _
+  @Mock private var deploymentResource: DEPLOYMENT_RES = _
+  @Mock private var pods: PODS = _
+  @Mock private var podsNamespaced: PODS_WITH_NAMESPACE = _
+  @Mock private var driverPodResource: PodResource = _
+  @Mock private var executorPodResource: PodResource = _
+  @Mock private var executorBuilder: KubernetesExecutorBuilder = _
+  @Mock private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
+  private var allocator: DeploymentPodsAllocator = _
+  private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+
+  before {
+    MockitoAnnotations.openMocks(this).close()
+    when(kubernetesClient.apps()).thenReturn(appsClient)
+    when(appsClient.deployments()).thenReturn(deployments)
+    when(deployments.inNamespace("default")).thenReturn(deploymentsNamespaced)
+    
when(deploymentsNamespaced.resource(any(classOf[Deployment]))).thenReturn(deploymentResource)
+    
when(deploymentsNamespaced.withName(any[String]())).thenReturn(deploymentResource)
+
+    when(kubernetesClient.pods()).thenReturn(pods)
+    when(pods.inNamespace("default")).thenReturn(podsNamespaced)
+    when(podsNamespaced.withName(driverPodName)).thenReturn(driverPodResource)
+    
when(podsNamespaced.resource(any(classOf[Pod]))).thenReturn(executorPodResource)
+    when(driverPodResource.get).thenReturn(driverPod)
+    when(driverPodResource.waitUntilReady(any(), any())).thenReturn(driverPod)
+    when(executorBuilder.buildFromFeatures(
+      any(classOf[KubernetesExecutorConf]),
+      meq(secMgr),
+      meq(kubernetesClient),
+      any(classOf[ResourceProfile])))
+      .thenAnswer { invocation =>
+        val k8sConf = invocation.getArgument[KubernetesExecutorConf](0)
+        KubernetesExecutorSpec(
+          executorPodWithId(0, k8sConf.resourceProfileId),
+          Seq.empty)
+      }
+
+    snapshotsStore = new DeterministicExecutorPodsSnapshotsStore
+    allocator = new DeploymentPodsAllocator(
+      conf,
+      secMgr,
+      executorBuilder,
+      kubernetesClient,
+      snapshotsStore,
+      snapshotsStore.clock)
+
+    when(schedulerBackend.getExecutorIds()).thenReturn(Seq.empty)
+    allocator.start(TEST_SPARK_APP_ID, schedulerBackend)
+  }
+
+  after {
+    ResourceProfile.clearDefaultProfile()
+  }
+
+  test("creates deployments per resource profile and seeds deletion cost 
annotation") {
+    val rpBuilder = new ResourceProfileBuilder()
+    val secondProfile = rpBuilder.build()
+
+    allocator.setTotalExpectedExecutors(
+      Map(defaultProfile -> 3, secondProfile -> 2))
+
+    val captor = ArgumentCaptor.forClass(classOf[Deployment])
+    verify(deploymentsNamespaced, times(2)).resource(captor.capture())
+    verify(deploymentResource, times(2)).create()
+
+    val createdDeployments = captor.getAllValues.asScala
+    createdDeployments.foreach { deployment =>
+      assert(deployment.getMetadata.getNamespace === "default")
+      assert(deployment.getSpec.getTemplate.getMetadata
+        .getAnnotations.get("controller.kubernetes.io/pod-deletion-cost") === 
"0")
+      
assert(deployment.getSpec.getTemplate.getSpec.getContainers.asScala.exists(
+        _.getName == "spark-executor"))
+      val selectorLabels = 
deployment.getSpec.getSelector.getMatchLabels.asScala
+      assert(selectorLabels(SPARK_APP_ID_LABEL) === TEST_SPARK_APP_ID)
+      assert(selectorLabels(SPARK_ROLE_LABEL) === SPARK_POD_EXECUTOR_ROLE)
+    }
+  }
+
+  test("scales existing deployment when replicas change") {
+    allocator.setTotalExpectedExecutors(Map(defaultProfile -> 5))
+    verify(deploymentResource, times(1)).create()
+
+    allocator.setTotalExpectedExecutors(Map(defaultProfile -> 7))
+    verify(deploymentResource).scale(7)
+  }
+
+  test("throws when executor template contributes dynamic PVCs") {
+    val pvc = persistentVolumeClaim("spark-pvc", "standard", "1Gi")
+    when(executorBuilder.buildFromFeatures(
+      any(classOf[KubernetesExecutorConf]),
+      meq(secMgr),
+      meq(kubernetesClient),
+      any(classOf[ResourceProfile])))
+      .thenReturn(KubernetesExecutorSpec(
+        executorPodWithId(0),
+        Seq(pvc)))
+
+    val error = intercept[SparkException] {
+      allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    }
+    assert(error.getMessage.contains("PersistentVolumeClaims are not 
supported"))
+    verify(deploymentResource, never()).create()
+  }
+
+  test("throws when executor template includes static PVC references") {
+    when(executorBuilder.buildFromFeatures(
+      any(classOf[KubernetesExecutorConf]),
+      meq(secMgr),
+      meq(kubernetesClient),
+      any(classOf[ResourceProfile])))
+      .thenReturn(KubernetesExecutorSpec(
+        executorPodWithIdAndVolume(0),
+        Seq.empty))
+
+    val error = intercept[SparkException] {
+      allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    }
+    assert(error.getMessage.contains("PersistentVolumeClaims are not 
supported"))
+    verify(deploymentResource, never()).create()
+  }
+
+  test("deletes deployments on stop") {
+    allocator.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    allocator.stop(schedulerBackendAppId)
+    verify(deploymentResource).delete()
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
index 07410b6a7b71..78e0942cfb82 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
@@ -26,6 +26,7 @@ import org.apache.spark._
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.internal.config._
 import org.apache.spark.scheduler.TaskSchedulerImpl
+import 
org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
 import org.apache.spark.scheduler.local.LocalSchedulerBackend
 
 class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
@@ -39,27 +40,33 @@ class KubernetesClusterManagerSuite extends SparkFunSuite 
with BeforeAndAfter {
   @Mock
   private var env: SparkEnv = _
 
-  @Mock
   private var sparkConf: SparkConf = _
 
   before {
     MockitoAnnotations.openMocks(this).close()
+    sparkConf = new SparkConf(false)
+      .set("spark.app.id", TEST_SPARK_APP_ID)
+      .set("spark.master", "k8s://test")
     when(sc.conf).thenReturn(sparkConf)
-    when(sc.conf.get(KUBERNETES_DRIVER_POD_NAME)).thenReturn(None)
-    when(sc.conf.get(EXECUTOR_INSTANCES)).thenReturn(None)
-    when(sc.conf.get(MAX_EXECUTOR_FAILURES)).thenReturn(None)
-    
when(sc.conf.get(EXECUTOR_ATTEMPT_FAILURE_VALIDITY_INTERVAL_MS)).thenReturn(None)
     when(sc.env).thenReturn(env)
+    when(env.securityManager).thenReturn(new SecurityManager(sparkConf))
+    resetDynamicAllocatorConfig()
+  }
+
+  after {
+    resetDynamicAllocatorConfig()
   }
 
   test("constructing a AbstractPodsAllocator works") {
-    val validConfigs = List("statefulset", "direct",
+    val validConfigs = List("statefulset", "deployment", "direct",
       classOf[StatefulSetPodsAllocator].getName,
+      classOf[DeploymentPodsAllocator].getName,
       classOf[ExecutorPodsAllocator].getName)
     validConfigs.foreach { c =>
       val manager = new KubernetesClusterManager()
-      when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c)
+      sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, c)
       manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+      sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
     }
   }
 
@@ -80,4 +87,34 @@ class KubernetesClusterManagerSuite extends SparkFunSuite 
with BeforeAndAfter {
     assert(backend2.isInstanceOf[LocalSchedulerBackend])
     assert(backend2.applicationId() === "user-app-id")
   }
+
+  test("deployment allocator with dynamic allocation requires deletion cost") {
+    val manager = new KubernetesClusterManager()
+    sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+    sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true")
+    sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+    sparkConf.set("spark.shuffle.service.enabled", "true")
+
+    val e = intercept[SparkException] {
+      manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+    }
+    assert(e.getMessage.contains(KUBERNETES_EXECUTOR_POD_DELETION_COST.key))
+  }
+
+  test("deployment allocator with dynamic allocation and deletion cost 
succeeds") {
+    val manager = new KubernetesClusterManager()
+    sparkConf.set(KUBERNETES_ALLOCATION_PODS_ALLOCATOR, "deployment")
+    sparkConf.set(DYN_ALLOCATION_ENABLED.key, "true")
+    sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 1)
+    sparkConf.set("spark.shuffle.service.enabled", "true")
+
+    manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+  }
+
+  private def resetDynamicAllocatorConfig(): Unit = {
+    sparkConf.remove(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)
+    sparkConf.remove(DYN_ALLOCATION_ENABLED.key)
+    sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+    sparkConf.remove("spark.shuffle.service.enabled")
+  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index b2e4a7182a77..76c2e16782c1 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -18,14 +18,17 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.Arrays
 import java.util.concurrent.TimeUnit
+import java.util.function.UnaryOperator
 
-import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList}
+import scala.jdk.CollectionConverters._
+
+import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodBuilder, PodList}
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
 import org.jmock.lib.concurrent.DeterministicScheduler
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
-import org.mockito.Mockito.{mock, never, spy, verify, when}
+import org.mockito.Mockito.{atLeastOnce, mock, never, spy, verify, when}
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkFunSuite}
@@ -205,17 +208,13 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
     verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled))
     verify(labeledPods, never()).delete()
-    verify(pod1op, never()).edit(any(
-      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
-    verify(pod2op, never()).edit(any(
-      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]]))
+    verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
     
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD)
 * 2,
       TimeUnit.MILLISECONDS)
     verify(labeledPods, never()).delete()
-    verify(pod1op, never()).edit(any(
-      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
-    verify(pod2op, never()).edit(any(
-      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod1op, never()).edit(any(classOf[UnaryOperator[Pod]]))
+    verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
 
     when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream)
     val podList = mock(classOf[PodList])
@@ -227,16 +226,64 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
     verify(labeledPods, never()).delete()
     schedulerExecutorService.runUntilIdle()
-    verify(pod1op).edit(any(
-      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
-    verify(pod2op, never()).edit(any(
-      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod1op).edit(any(classOf[UnaryOperator[Pod]]))
+    verify(pod2op, never()).edit(any(classOf[UnaryOperator[Pod]]))
     verify(labeledPods, never()).delete()
     
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD)
 * 2,
       TimeUnit.MILLISECONDS)
     verify(labeledPods).delete()
   }
 
+  test("Annotates executor pods with deletion cost when configured") {
+    sparkConf.set(KUBERNETES_EXECUTOR_POD_DELETION_COST, 7)
+    schedulerBackendUnderTest.start()
+
+    when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods)
+    when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, 
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(labeledPods.withLabel(SPARK_APP_ID_LABEL, 
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(labeledPods.withLabel(SPARK_ROLE_LABEL, 
SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
+    when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, 
"3")).thenReturn(labeledPods)
+
+    val podResource = mock(classOf[PodResource])
+    val basePod = new PodBuilder()
+      .withNewMetadata()
+        .withName("exec-3")
+        .withNamespace("default")
+        .endMetadata()
+      .build()
+
+    val editCaptor = ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]])
+    when(podResource.edit(any(classOf[UnaryOperator[Pod]]))).thenAnswer { 
invocation =>
+      val fn = invocation.getArgument[UnaryOperator[Pod]](0)
+      fn.apply(basePod)
+    }
+
+    when(labeledPods.resources())
+      .thenAnswer(_ => java.util.stream.Stream.of[PodResource](podResource))
+
+    val method = classOf[KubernetesClusterSchedulerBackend]
+      .getDeclaredMethods
+      .find(_.getName == "annotateExecutorDeletionCost")
+      .get
+    method.setAccessible(true)
+    method.invoke(schedulerBackendUnderTest, Seq("3"))
+    schedulerExecutorService.runUntilIdle()
+
+    verify(podResource, atLeastOnce()).edit(editCaptor.capture())
+    val appliedPods = editCaptor.getAllValues.asScala
+      .scanLeft(basePod)((pod, fn) => fn.apply(pod))
+      .tail
+    val annotated = appliedPods
+      .find(_.getMetadata.getAnnotations.asScala
+        .contains("controller.kubernetes.io/pod-deletion-cost"))
+    assert(annotated.isDefined,
+      s"expected controller.kubernetes.io/pod-deletion-cost annotation, got 
annotations " +
+        s"${appliedPods.map(_.getMetadata.getAnnotations).asJava}")
+    val annotations = annotated.get.getMetadata.getAnnotations.asScala
+    assert(annotations("controller.kubernetes.io/pod-deletion-cost") === "7")
+    sparkConf.remove(KUBERNETES_EXECUTOR_POD_DELETION_COST.key)
+  }
+
   test("SPARK-34407: CoarseGrainedSchedulerBackend.stop may throw 
SparkException") {
     schedulerBackendUnderTest.start()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to