Repository: spark Updated Branches: refs/heads/master 04046e543 -> 0c2935b01
[SPARK-25515][K8S] Adds a config option to keep executor pods for debugging ## What changes were proposed in this pull request? Keeps K8s executor resources present if case of failure or normal termination. Introduces a new boolean config option: `spark.kubernetes.deleteExecutors`, with default value set to true. The idea is to update Spark K8s backend structures but leave the resources around. The assumption is that since entries are not removed from the `removedExecutorsCache` we are immune to updates that refer to the the executor resources previously removed. The only delete operation not touched is the one in the `doKillExecutors` method. Reason is right now we dont support [blacklisting](https://issues.apache.org/jira/browse/SPARK-23485) and dynamic allocation with Spark on K8s. In both cases in the future we might want to handle these scenarios although its more complicated. More tests can be added if approach is approved. ## How was this patch tested? Manually by running a Spark job and verifying pods are not deleted. Closes #23136 from skonto/keep_pods. Authored-by: Stavros Kontopoulos <stavros.kontopou...@lightbend.com> Signed-off-by: Yinan Li <y...@google.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c2935b0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c2935b0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c2935b0 Branch: refs/heads/master Commit: 0c2935b01def8a5f631851999d9c2d57b63763e6 Parents: 04046e5 Author: Stavros Kontopoulos <stavros.kontopou...@lightbend.com> Authored: Mon Dec 3 09:02:47 2018 -0800 Committer: Yinan Li <y...@google.com> Committed: Mon Dec 3 09:02:47 2018 -0800 ---------------------------------------------------------------------- docs/running-on-kubernetes.md | 7 +++++++ .../scala/org/apache/spark/deploy/k8s/Config.scala | 7 +++++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 15 ++++++++++----- .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 8 ++++++-- .../k8s/KubernetesClusterSchedulerBackend.scala | 15 ++++++++++----- .../k8s/ExecutorPodsLifecycleManagerSuite.scala | 14 +++++++++++++- 6 files changed, 53 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/docs/running-on-kubernetes.md ---------------------------------------------------------------------- diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 5639253..3172b1b 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -944,6 +944,13 @@ specific to Spark on Kubernetes. <code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code> </td> </tr> +<tr> + <td><code>spark.kubernetes.executor.deleteOnTermination</code></td> + <td>true</td> + <td> + Specify whether executor pods should be deleted in case of failure or normal termination. + </td> +</tr> </table> #### Pod template properties http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 1abf290..e8bf16d 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -282,6 +282,13 @@ private[spark] object Config extends Logging { val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." + val KUBERNETES_DELETE_EXECUTORS = + ConfigBuilder("spark.kubernetes.executor.deleteOnTermination") + .doc("If set to false then executor pods will not be deleted in case " + + "of failure or normal termination.") + .booleanConf + .createWithDefault(true) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets." http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 77bb9c3..ef4cbdf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -51,6 +51,8 @@ private[spark] class ExecutorPodsAllocator( private val kubernetesDriverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) + private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + private val driverPod = kubernetesDriverPodName .map(name => Option(kubernetesClient.pods() .withName(name) @@ -86,11 +88,14 @@ private[spark] class ExecutorPodsAllocator( s" cluster after $podCreationTimeout milliseconds despite the fact that a" + " previous allocation attempt tried to create it. The executor may have been" + " deleted but the application missed the deletion event.") - Utils.tryLogNonFatalError { - kubernetesClient - .pods() - .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) - .delete() + + if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString) + .delete() + } } newlyCreatedExecutors -= execId } else { http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index 77a1d6c..95e1ba8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -30,7 +30,7 @@ import org.apache.spark.scheduler.ExecutorExited import org.apache.spark.util.Utils private[spark] class ExecutorPodsLifecycleManager( - conf: SparkConf, + val conf: SparkConf, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, // Use a best-effort to track which executors have been removed already. It's not generally @@ -43,6 +43,8 @@ private[spark] class ExecutorPodsLifecycleManager( private val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL) + private lazy val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { snapshotsStore.addSubscriber(eventProcessingInterval) { onNewSnapshots(schedulerBackend, _) @@ -112,7 +114,9 @@ private[spark] class ExecutorPodsLifecycleManager( schedulerBackend: KubernetesClusterSchedulerBackend, execIdsRemovedInRound: mutable.Set[Long]): Unit = { removeExecutorFromSpark(schedulerBackend, podState, execId) - removeExecutorFromK8s(podState.pod) + if (shouldDeleteExecutors) { + removeExecutorFromK8s(podState.pod) + } execIdsRemovedInRound += execId } http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index fa6dc2c..6356b58 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService import io.fabric8.kubernetes.client.KubernetesClient import scala.concurrent.{ExecutionContext, Future} +import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.rpc.{RpcAddress, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} @@ -51,6 +52,8 @@ private[spark] class KubernetesClusterSchedulerBackend( private val initialExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) + private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS) + // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = { removeExecutor(executorId, reason) @@ -82,11 +85,13 @@ private[spark] class KubernetesClusterSchedulerBackend( pollEvents.stop() } - Utils.tryLogNonFatalError { - kubernetesClient.pods() - .withLabel(SPARK_APP_ID_LABEL, applicationId()) - .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) - .delete() + if (shouldDeleteExecutors) { + Utils.tryLogNonFatalError { + kubernetesClient.pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .delete() + } } Utils.tryLogNonFatalError { http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala index 3995b2a..7411f8f 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala @@ -22,7 +22,7 @@ import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Matchers.any -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{mock, never, times, verify, when} import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter @@ -30,6 +30,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.deploy.k8s.KubernetesUtils._ import org.apache.spark.scheduler.ExecutorExited @@ -100,6 +101,17 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) } + test("Keep executor pods in k8s if configured.") { + val failedPod = failedExecutorWithoutDeletion(1) + eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false) + snapshotsStore.updatePod(failedPod) + snapshotsStore.notifySubscribers() + val msg = exitReasonMessage(1, failedPod) + val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg) + verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason) + verify(podOperations, never()).delete() + } + private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = { val reason = Option(failedPod.getStatus.getReason) val message = Option(failedPod.getStatus.getMessage) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org