This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bee2799  [SPARK-35956][K8S] Support auto assigning labels to 
decommissioning pods
bee2799 is described below

commit bee279997f2115af6b15e3dbb7433dccef7f14af
Author: Holden Karau <hka...@netflix.com>
AuthorDate: Fri Jul 23 15:21:38 2021 -0700

    [SPARK-35956][K8S] Support auto assigning labels to decommissioning pods
    
    ### What changes were proposed in this pull request?
    
    Add a new configuration flag to allow Spark to provide hints to the 
scheduler when we are decommissioning or exiting a pod that this pod will have 
the least impact for a pre-emption event.
    
    ### Why are the changes needed?
    
    Kubernetes added the concepts of pod disruption budgets (which can have 
selectors based on labels) as well pod deletion for providing hints to the 
scheduler as to what we would prefer to have pre-empted.
    
    ### Does this PR introduce _any_ user-facing change?
    
    New configuration flag
    
    ### How was this patch tested?
    
    The deletion unit test was extended.
    
    Closes #33270 from 
holdenk/SPARK-35956-support-auto-assigning-labels-to-decommissioning-pods.
    
    Lead-authored-by: Holden Karau <hka...@netflix.com>
    Co-authored-by: Holden Karau <hol...@pigscanfly.ca>
    Co-authored-by: Holden Karau <hka...@apple.com>
    Signed-off-by: Holden Karau <hka...@netflix.com>
---
 docs/running-on-kubernetes.md                      | 28 +++++++++---
 .../scala/org/apache/spark/deploy/k8s/Config.scala | 18 ++++++++
 .../k8s/KubernetesClusterSchedulerBackend.scala    | 50 +++++++++++++++++++++-
 .../KubernetesClusterSchedulerBackendSuite.scala   | 47 +++++++++++++++++++-
 .../k8s/integrationtest/DecommissionSuite.scala    | 40 ++++++++++++++---
 5 files changed, 168 insertions(+), 15 deletions(-)

diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 09f7d2ab..b30d61d 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -8,9 +8,9 @@ license: |
   The ASF licenses this file to You under the Apache License, Version 2.0
   (the "License"); you may not use this file except in compliance with
   the License.  You may obtain a copy of the License at
- 
+
      http://www.apache.org/licenses/LICENSE-2.0
- 
+
   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -422,7 +422,7 @@ Your Kubernetes config file typically lives under 
`.kube/config` in your home di
 
 ### Contexts
 
-Kubernetes configuration files can contain multiple contexts that allow for 
switching between different clusters and/or user identities.  By default Spark 
on Kubernetes will use your current context (which can be checked by running 
`kubectl config current-context`) when doing the initial auto-configuration of 
the Kubernetes client.  
+Kubernetes configuration files can contain multiple contexts that allow for 
switching between different clusters and/or user identities.  By default Spark 
on Kubernetes will use your current context (which can be checked by running 
`kubectl config current-context`) when doing the initial auto-configuration of 
the Kubernetes client.
 
 In order to use an alternative context users can specify the desired context 
via the Spark configuration property `spark.kubernetes.context` e.g. 
`spark.kubernetes.context=minikube`.
 
@@ -1038,7 +1038,7 @@ See the [configuration page](configuration.html) for 
information on Spark config
    <code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>.
   </td>
   <td>2.4.0</td>
-</tr>   
+</tr>
 <tr>
   
<td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path</code></td>
   <td>(none)</td>
@@ -1270,7 +1270,7 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
   <td>3.0.0</td>
 </tr>
-<tr>  
+<tr>
   <td><code>spark.kubernetes.appKillPodDeletionGracePeriod</code></td>
   <td>(none)</td>
   <td>
@@ -1288,6 +1288,24 @@ See the [configuration page](configuration.html) for 
information on Spark config
   </td>
   <td>3.0.0</td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.executor.decommmissionLabel<code></td>
+  <td>(none)</td>
+  <td>
+    Label to be applied to pods which are exiting or being decommissioned. 
Intended for use
+    with pod disruption budgets, deletion costs, and similar.
+  </td>
+  <td>3.3.0</td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.executor.decommmissionLabelValue<code></td>
+  <td>(none)</td>
+  <td>
+    Value to be applied with the label when
+    <code>spark.kubernetes.executor.decommmissionLabel</code> is enabled.
+  </td>
+  <td>3.3.0</td>
+</tr>
 </table>
 
 #### Pod template properties
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 49c0a42..33370b7 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -305,6 +305,24 @@ private[spark] object Config extends Logging {
       .toSequence
       .createWithDefault(Nil)
 
+  val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL =
+    ConfigBuilder("spark.kubernetes.executor.decommmissionLabel")
+      .doc("Label to apply to a pod which is being decommissioned." +
+        " Designed for use with pod disruption budgets and similar mechanism" +
+        " such as pod-deletion-cost.")
+      .version("3.3.0")
+      .stringConf
+      .createOptional
+
+  val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE =
+    ConfigBuilder("spark.kubernetes.executor.decommmissionLabelValue")
+      .doc("Label value to apply to a pod which is being decommissioned." +
+        " Designed for use with pod disruption budgets and similar mechanism" +
+        " such as pod-deletion-cost.")
+      .version("3.3.0")
+      .stringConf
+      .createOptional
+
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")
       .doc("Number of pods to launch at once in each round of executor 
allocation.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 42a9300..25f6851 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -18,9 +18,11 @@ package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 
+import scala.collection.JavaConverters._
 import scala.concurrent.Future
 
 import io.fabric8.kubernetes.api.model.Pod
+import io.fabric8.kubernetes.api.model.PodBuilder
 import io.fabric8.kubernetes.client.KubernetesClient
 
 import org.apache.spark.SparkContext
@@ -32,7 +34,8 @@ import 
org.apache.spark.deploy.security.HadoopDelegationTokenManager
 import 
org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.rpc.{RpcAddress, RpcCallContext}
-import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, 
TaskSchedulerImpl}
+import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorKilled, 
ExecutorLossReason,
+  TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor
 import org.apache.spark.util.{ThreadUtils, Utils}
@@ -182,7 +185,52 @@ private[spark] class KubernetesClusterSchedulerBackend(
     super.getExecutorIds()
   }
 
+  private def labelDecommissioningExecs(execIds: Seq[String]) = {
+    // Only kick off the labeling task if we have a label.
+    conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
+      val labelTask = new Runnable() {
+        override def run(): Unit = Utils.tryLogNonFatalError {
+
+          val podsToLabel = kubernetesClient.pods()
+            .withLabel(SPARK_APP_ID_LABEL, applicationId())
+            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
+            .list().getItems().asScala
+
+          podsToLabel.foreach { pod =>
+            kubernetesClient.pods()
+              .inNamespace(pod.getMetadata.getNamespace)
+              .withName(pod.getMetadata.getName)
+              .edit({p: Pod => new PodBuilder(p).editMetadata()
+                .addToLabels(label,
+                  
conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
+                .endMetadata()
+                .build()})
+          }
+        }
+      }
+      executorService.execute(labelTask)
+    }
+  }
+
+  override def decommissionExecutors(
+      executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
+      adjustTargetNumExecutors: Boolean,
+      triggeredByExecutor: Boolean): Seq[String] = {
+    // If decommissioning is triggered by the executor the K8s cluster manager 
has already
+    // picked the pod to evict so we don't need to update the labels.
+    if (!triggeredByExecutor) {
+      labelDecommissioningExecs(executorsAndDecomInfo.map(_._1))
+    }
+    super.decommissionExecutors(executorsAndDecomInfo, 
adjustTargetNumExecutors,
+      triggeredByExecutor)
+  }
+
   override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = {
+    // If we've decided to remove some executors we should tell Kubernetes 
that we don't care.
+    labelDecommissioningExecs(executorIds)
+
+    // Tell the executors to exit themselves.
     executorIds.foreach { id =>
       removeExecutor(id, ExecutorKilled)
     }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 5dd84e8..bf17aa3 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -19,8 +19,9 @@ package org.apache.spark.scheduler.cluster.k8s
 import java.util.Arrays
 import java.util.concurrent.TimeUnit
 
-import io.fabric8.kubernetes.api.model.{Pod, PodList}
+import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod, PodList}
 import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
 import org.jmock.lib.concurrent.DeterministicScheduler
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
@@ -44,6 +45,8 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
   private val sparkConf = new SparkConf(false)
     .set("spark.executor.instances", "3")
     .set("spark.app.id", TEST_SPARK_APP_ID)
+    .set("spark.kubernetes.executor.decommmissionLabel", "soLong")
+    .set("spark.kubernetes.executor.decommmissionLabelValue", "cruelWorld")
 
   @Mock
   private var sc: SparkContext = _
@@ -166,26 +169,66 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
 
   test("Kill executors") {
     schedulerBackendUnderTest.start()
+
+    val operation = mock(classOf[NonNamespaceOperation[
+      Pod, PodList, PodResource[Pod]]])
+
+    when(podOperations.inNamespace(any())).thenReturn(operation)
     when(podOperations.withField(any(), any())).thenReturn(labeledPods)
+    when(podOperations.withLabel(SPARK_APP_ID_LABEL, 
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
     when(labeledPods.withLabel(SPARK_APP_ID_LABEL, 
TEST_SPARK_APP_ID)).thenReturn(labeledPods)
     when(labeledPods.withLabel(SPARK_ROLE_LABEL, 
SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
     when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", 
"2")).thenReturn(labeledPods)
 
+    val pod1 = mock(classOf[Pod])
+    val pod1Metadata = mock(classOf[ObjectMeta])
+    when(pod1Metadata.getNamespace).thenReturn("coffeeIsLife")
+    when(pod1Metadata.getName).thenReturn("pod1")
+    when(pod1.getMetadata).thenReturn(pod1Metadata)
+
+    val pod2 = mock(classOf[Pod])
+    val pod2Metadata = mock(classOf[ObjectMeta])
+    when(pod2Metadata.getNamespace).thenReturn("coffeeIsLife")
+    when(pod2Metadata.getName).thenReturn("pod2")
+    when(pod2.getMetadata).thenReturn(pod2Metadata)
+
+    val pod1op = mock(classOf[PodResource[Pod]])
+    val pod2op = mock(classOf[PodResource[Pod]])
+    when(operation.withName("pod1")).thenReturn(pod1op)
+    when(operation.withName("pod2")).thenReturn(pod2op)
+
     val podList = mock(classOf[PodList])
     when(labeledPods.list()).thenReturn(podList)
     when(podList.getItems()).thenReturn(Arrays.asList[Pod]())
+    
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD)
 * 2,
+      TimeUnit.MILLISECONDS)
+    verify(labeledPods, never()).delete()
 
     schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
     verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled))
     verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled))
     verify(labeledPods, never()).delete()
+    verify(pod1op, never()).edit(any(
+      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod2op, never()).edit(any(
+      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
     
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD)
 * 2,
       TimeUnit.MILLISECONDS)
     verify(labeledPods, never()).delete()
+    verify(pod1op, never()).edit(any(
+      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod2op, never()).edit(any(
+      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
 
-    when(podList.getItems()).thenReturn(Arrays.asList(mock(classOf[Pod])))
+    when(podList.getItems()).thenReturn(Arrays.asList(pod1))
     schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
     verify(labeledPods, never()).delete()
+    schedulerExecutorService.runUntilIdle()
+    verify(pod1op).edit(any(
+      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(pod2op, never()).edit(any(
+      
classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
+    verify(labeledPods, never()).delete()
     
schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD)
 * 2,
       TimeUnit.MILLISECONDS)
     verify(labeledPods).delete()
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
index 75c27f6..1250126 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala
@@ -16,9 +16,12 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
-import org.scalatest.concurrent.PatienceConfiguration
-import org.scalatest.time.Minutes
-import org.scalatest.time.Span
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.Pod
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.matchers.should.Matchers._
+import org.scalatest.time.{Minutes, Seconds, Span}
 
 import org.apache.spark.internal.config
 
@@ -98,10 +101,33 @@ private[spark] trait DecommissionSuite { k8sSuite: 
KubernetesSuite =>
       .set(config.DYN_ALLOCATION_MIN_EXECUTORS.key, "1")
       .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS.key, "2")
       .set(config.DYN_ALLOCATION_ENABLED.key, "true")
-      // The default of 30 seconds is fine, but for testing we just want to 
get this done fast.
-      .set("spark.storage.decommission.replicationReattemptInterval", "1")
+      // The default of 30 seconds is fine, but for testing we just want to
+      // give enough time to validate the labels are set.
+      .set("spark.storage.decommission.replicationReattemptInterval", "75")
+      // Configure labels for decommissioning pods.
+      .set("spark.kubernetes.executor.decommmissionLabel", "solong")
+      .set("spark.kubernetes.executor.decommmissionLabelValue", "cruelworld")
 
-    var execLogs: String = ""
+    // This is called on all exec pods but we only care about exec 0 since 
it's the "first."
+    // We only do this inside of this test since the other tests trigger k8s 
side deletes where we
+    // do not apply labels.
+    def checkFirstExecutorPodGetsLabeled(pod: Pod): Unit = {
+      if (pod.getMetadata.getName.endsWith("-1")) {
+        val client = kubernetesTestComponents.kubernetesClient
+        // The label will be added eventually, but k8s objects don't refresh.
+        Eventually.eventually(
+          PatienceConfiguration.Timeout(Span(1200, Seconds)),
+          PatienceConfiguration.Interval(Span(1, Seconds))) {
+
+          val currentPod = client.pods().withName(pod.getMetadata.getName).get
+          val labels = currentPod.getMetadata.getLabels.asScala
+
+          labels should not be (null)
+          labels should (contain key ("solong") and contain value 
("cruelworld"))
+        }
+      }
+      doBasicExecutorPyPodCheck(pod)
+    }
 
     runSparkApplicationAndVerifyCompletion(
       appResource = PYSPARK_SCALE,
@@ -113,7 +139,7 @@ private[spark] trait DecommissionSuite { k8sSuite: 
KubernetesSuite =>
           "driver killed: 0, unexpectedly exited: 0)."),
       appArgs = Array.empty[String],
       driverPodChecker = doBasicDriverPyPodCheck,
-      executorPodChecker = doBasicExecutorPyPodCheck,
+      executorPodChecker = checkFirstExecutorPodGetsLabeled,
       isJVM = false,
       pyFiles = None,
       executorPatience = Some(None, Some(DECOMMISSIONING_FINISHED_TIMEOUT)),

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to