This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bee2799 [SPARK-35956][K8S] Support auto assigning labels to decommissioning pods bee2799 is described below commit bee279997f2115af6b15e3dbb7433dccef7f14af Author: Holden Karau <hka...@netflix.com> AuthorDate: Fri Jul 23 15:21:38 2021 -0700 [SPARK-35956][K8S] Support auto assigning labels to decommissioning pods ### What changes were proposed in this pull request? Add a new configuration flag to allow Spark to provide hints to the scheduler when we are decommissioning or exiting a pod that this pod will have the least impact for a pre-emption event. ### Why are the changes needed? Kubernetes added the concepts of pod disruption budgets (which can have selectors based on labels) as well pod deletion for providing hints to the scheduler as to what we would prefer to have pre-empted. ### Does this PR introduce _any_ user-facing change? New configuration flag ### How was this patch tested? The deletion unit test was extended. Closes #33270 from holdenk/SPARK-35956-support-auto-assigning-labels-to-decommissioning-pods. Lead-authored-by: Holden Karau <hka...@netflix.com> Co-authored-by: Holden Karau <hol...@pigscanfly.ca> Co-authored-by: Holden Karau <hka...@apple.com> Signed-off-by: Holden Karau <hka...@netflix.com> --- docs/running-on-kubernetes.md | 28 +++++++++--- .../scala/org/apache/spark/deploy/k8s/Config.scala | 18 ++++++++ .../k8s/KubernetesClusterSchedulerBackend.scala | 50 +++++++++++++++++++++- .../KubernetesClusterSchedulerBackendSuite.scala | 47 +++++++++++++++++++- .../k8s/integrationtest/DecommissionSuite.scala | 40 ++++++++++++++--- 5 files changed, 168 insertions(+), 15 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 09f7d2ab..b30d61d 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -8,9 +8,9 @@ license: | The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -422,7 +422,7 @@ Your Kubernetes config file typically lives under `.kube/config` in your home di ### Contexts -Kubernetes configuration files can contain multiple contexts that allow for switching between different clusters and/or user identities. By default Spark on Kubernetes will use your current context (which can be checked by running `kubectl config current-context`) when doing the initial auto-configuration of the Kubernetes client. +Kubernetes configuration files can contain multiple contexts that allow for switching between different clusters and/or user identities. By default Spark on Kubernetes will use your current context (which can be checked by running `kubectl config current-context`) when doing the initial auto-configuration of the Kubernetes client. In order to use an alternative context users can specify the desired context via the Spark configuration property `spark.kubernetes.context` e.g. `spark.kubernetes.context=minikube`. @@ -1038,7 +1038,7 @@ See the [configuration page](configuration.html) for information on Spark config <code>spark.kubernetes.executor.secrets.ENV_VAR=spark-secret:key</code>. </td> <td>2.4.0</td> -</tr> +</tr> <tr> <td><code>spark.kubernetes.driver.volumes.[VolumeType].[VolumeName].mount.path</code></td> <td>(none)</td> @@ -1270,7 +1270,7 @@ See the [configuration page](configuration.html) for information on Spark config </td> <td>3.0.0</td> </tr> -<tr> +<tr> <td><code>spark.kubernetes.appKillPodDeletionGracePeriod</code></td> <td>(none)</td> <td> @@ -1288,6 +1288,24 @@ See the [configuration page](configuration.html) for information on Spark config </td> <td>3.0.0</td> </tr> +<tr> + <td><code>spark.kubernetes.executor.decommmissionLabel<code></td> + <td>(none)</td> + <td> + Label to be applied to pods which are exiting or being decommissioned. Intended for use + with pod disruption budgets, deletion costs, and similar. + </td> + <td>3.3.0</td> +</tr> +<tr> + <td><code>spark.kubernetes.executor.decommmissionLabelValue<code></td> + <td>(none)</td> + <td> + Value to be applied with the label when + <code>spark.kubernetes.executor.decommmissionLabel</code> is enabled. + </td> + <td>3.3.0</td> +</tr> </table> #### Pod template properties diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 49c0a42..33370b7 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -305,6 +305,24 @@ private[spark] object Config extends Logging { .toSequence .createWithDefault(Nil) + val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL = + ConfigBuilder("spark.kubernetes.executor.decommmissionLabel") + .doc("Label to apply to a pod which is being decommissioned." + + " Designed for use with pod disruption budgets and similar mechanism" + + " such as pod-deletion-cost.") + .version("3.3.0") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE = + ConfigBuilder("spark.kubernetes.executor.decommmissionLabelValue") + .doc("Label value to apply to a pod which is being decommissioned." + + " Designed for use with pod disruption budgets and similar mechanism" + + " such as pod-deletion-cost.") + .version("3.3.0") + .stringConf + .createOptional + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 42a9300..25f6851 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -18,9 +18,11 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import scala.collection.JavaConverters._ import scala.concurrent.Future import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.client.KubernetesClient import org.apache.spark.SparkContext @@ -32,7 +34,8 @@ import org.apache.spark.deploy.security.HadoopDelegationTokenManager import org.apache.spark.internal.config.SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO import org.apache.spark.resource.ResourceProfile import org.apache.spark.rpc.{RpcAddress, RpcCallContext} -import org.apache.spark.scheduler.{ExecutorKilled, ExecutorLossReason, TaskSchedulerImpl} +import org.apache.spark.scheduler.{ExecutorDecommissionInfo, ExecutorKilled, ExecutorLossReason, + TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RegisterExecutor import org.apache.spark.util.{ThreadUtils, Utils} @@ -182,7 +185,52 @@ private[spark] class KubernetesClusterSchedulerBackend( super.getExecutorIds() } + private def labelDecommissioningExecs(execIds: Seq[String]) = { + // Only kick off the labeling task if we have a label. + conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label => + val labelTask = new Runnable() { + override def run(): Unit = Utils.tryLogNonFatalError { + + val podsToLabel = kubernetesClient.pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*) + .list().getItems().asScala + + podsToLabel.foreach { pod => + kubernetesClient.pods() + .inNamespace(pod.getMetadata.getNamespace) + .withName(pod.getMetadata.getName) + .edit({p: Pod => new PodBuilder(p).editMetadata() + .addToLabels(label, + conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse("")) + .endMetadata() + .build()}) + } + } + } + executorService.execute(labelTask) + } + } + + override def decommissionExecutors( + executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)], + adjustTargetNumExecutors: Boolean, + triggeredByExecutor: Boolean): Seq[String] = { + // If decommissioning is triggered by the executor the K8s cluster manager has already + // picked the pod to evict so we don't need to update the labels. + if (!triggeredByExecutor) { + labelDecommissioningExecs(executorsAndDecomInfo.map(_._1)) + } + super.decommissionExecutors(executorsAndDecomInfo, adjustTargetNumExecutors, + triggeredByExecutor) + } + override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = { + // If we've decided to remove some executors we should tell Kubernetes that we don't care. + labelDecommissioningExecs(executorIds) + + // Tell the executors to exit themselves. executorIds.foreach { id => removeExecutor(id, ExecutorKilled) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 5dd84e8..bf17aa3 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -19,8 +19,9 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.Arrays import java.util.concurrent.TimeUnit -import io.fabric8.kubernetes.api.model.{Pod, PodList} +import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod, PodList} import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource} import org.jmock.lib.concurrent.DeterministicScheduler import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.{any, eq => mockitoEq} @@ -44,6 +45,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn private val sparkConf = new SparkConf(false) .set("spark.executor.instances", "3") .set("spark.app.id", TEST_SPARK_APP_ID) + .set("spark.kubernetes.executor.decommmissionLabel", "soLong") + .set("spark.kubernetes.executor.decommmissionLabelValue", "cruelWorld") @Mock private var sc: SparkContext = _ @@ -166,26 +169,66 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn test("Kill executors") { schedulerBackendUnderTest.start() + + val operation = mock(classOf[NonNamespaceOperation[ + Pod, PodList, PodResource[Pod]]]) + + when(podOperations.inNamespace(any())).thenReturn(operation) when(podOperations.withField(any(), any())).thenReturn(labeledPods) + when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods) when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods) when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods) + val pod1 = mock(classOf[Pod]) + val pod1Metadata = mock(classOf[ObjectMeta]) + when(pod1Metadata.getNamespace).thenReturn("coffeeIsLife") + when(pod1Metadata.getName).thenReturn("pod1") + when(pod1.getMetadata).thenReturn(pod1Metadata) + + val pod2 = mock(classOf[Pod]) + val pod2Metadata = mock(classOf[ObjectMeta]) + when(pod2Metadata.getNamespace).thenReturn("coffeeIsLife") + when(pod2Metadata.getName).thenReturn("pod2") + when(pod2.getMetadata).thenReturn(pod2Metadata) + + val pod1op = mock(classOf[PodResource[Pod]]) + val pod2op = mock(classOf[PodResource[Pod]]) + when(operation.withName("pod1")).thenReturn(pod1op) + when(operation.withName("pod2")).thenReturn(pod2op) + val podList = mock(classOf[PodList]) when(labeledPods.list()).thenReturn(podList) when(podList.getItems()).thenReturn(Arrays.asList[Pod]()) + schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, + TimeUnit.MILLISECONDS) + verify(labeledPods, never()).delete() schedulerBackendUnderTest.doKillExecutors(Seq("1", "2")) verify(driverEndpointRef).send(RemoveExecutor("1", ExecutorKilled)) verify(driverEndpointRef).send(RemoveExecutor("2", ExecutorKilled)) verify(labeledPods, never()).delete() + verify(pod1op, never()).edit(any( + classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) + verify(pod2op, never()).edit(any( + classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, TimeUnit.MILLISECONDS) verify(labeledPods, never()).delete() + verify(pod1op, never()).edit(any( + classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) + verify(pod2op, never()).edit(any( + classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) - when(podList.getItems()).thenReturn(Arrays.asList(mock(classOf[Pod]))) + when(podList.getItems()).thenReturn(Arrays.asList(pod1)) schedulerBackendUnderTest.doKillExecutors(Seq("1", "2")) verify(labeledPods, never()).delete() + schedulerExecutorService.runUntilIdle() + verify(pod1op).edit(any( + classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) + verify(pod2op, never()).edit(any( + classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]])) + verify(labeledPods, never()).delete() schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2, TimeUnit.MILLISECONDS) verify(labeledPods).delete() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala index 75c27f6..1250126 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DecommissionSuite.scala @@ -16,9 +16,12 @@ */ package org.apache.spark.deploy.k8s.integrationtest -import org.scalatest.concurrent.PatienceConfiguration -import org.scalatest.time.Minutes -import org.scalatest.time.Span +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.Pod +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.matchers.should.Matchers._ +import org.scalatest.time.{Minutes, Seconds, Span} import org.apache.spark.internal.config @@ -98,10 +101,33 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => .set(config.DYN_ALLOCATION_MIN_EXECUTORS.key, "1") .set(config.DYN_ALLOCATION_INITIAL_EXECUTORS.key, "2") .set(config.DYN_ALLOCATION_ENABLED.key, "true") - // The default of 30 seconds is fine, but for testing we just want to get this done fast. - .set("spark.storage.decommission.replicationReattemptInterval", "1") + // The default of 30 seconds is fine, but for testing we just want to + // give enough time to validate the labels are set. + .set("spark.storage.decommission.replicationReattemptInterval", "75") + // Configure labels for decommissioning pods. + .set("spark.kubernetes.executor.decommmissionLabel", "solong") + .set("spark.kubernetes.executor.decommmissionLabelValue", "cruelworld") - var execLogs: String = "" + // This is called on all exec pods but we only care about exec 0 since it's the "first." + // We only do this inside of this test since the other tests trigger k8s side deletes where we + // do not apply labels. + def checkFirstExecutorPodGetsLabeled(pod: Pod): Unit = { + if (pod.getMetadata.getName.endsWith("-1")) { + val client = kubernetesTestComponents.kubernetesClient + // The label will be added eventually, but k8s objects don't refresh. + Eventually.eventually( + PatienceConfiguration.Timeout(Span(1200, Seconds)), + PatienceConfiguration.Interval(Span(1, Seconds))) { + + val currentPod = client.pods().withName(pod.getMetadata.getName).get + val labels = currentPod.getMetadata.getLabels.asScala + + labels should not be (null) + labels should (contain key ("solong") and contain value ("cruelworld")) + } + } + doBasicExecutorPyPodCheck(pod) + } runSparkApplicationAndVerifyCompletion( appResource = PYSPARK_SCALE, @@ -113,7 +139,7 @@ private[spark] trait DecommissionSuite { k8sSuite: KubernetesSuite => "driver killed: 0, unexpectedly exited: 0)."), appArgs = Array.empty[String], driverPodChecker = doBasicDriverPyPodCheck, - executorPodChecker = doBasicExecutorPyPodCheck, + executorPodChecker = checkFirstExecutorPodGetsLabeled, isJVM = false, pyFiles = None, executorPatience = Some(None, Some(DECOMMISSIONING_FINISHED_TIMEOUT)), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org