Repository: spark
Updated Branches:
  refs/heads/master 04046e543 -> 0c2935b01


[SPARK-25515][K8S] Adds a config option to keep executor pods for debugging

## What changes were proposed in this pull request?
Keeps K8s executor resources present if case of failure or normal termination.
Introduces a new boolean config option: `spark.kubernetes.deleteExecutors`, 
with default value set to true.
The idea is to update Spark K8s backend structures but leave the resources 
around.
The assumption is that since entries are not removed from the 
`removedExecutorsCache` we are immune to updates that refer to the the executor 
resources previously removed.
The only delete operation not touched is the one in the `doKillExecutors` 
method.
Reason is right now we dont support 
[blacklisting](https://issues.apache.org/jira/browse/SPARK-23485) and dynamic 
allocation with Spark on K8s. In both cases in the future we might want to 
handle these scenarios although its more complicated.
More tests can be added if approach is approved.
## How was this patch tested?
Manually by running a Spark job and verifying pods are not deleted.

Closes #23136 from skonto/keep_pods.

Authored-by: Stavros Kontopoulos <stavros.kontopou...@lightbend.com>
Signed-off-by: Yinan Li <y...@google.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c2935b0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c2935b0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c2935b0

Branch: refs/heads/master
Commit: 0c2935b01def8a5f631851999d9c2d57b63763e6
Parents: 04046e5
Author: Stavros Kontopoulos <stavros.kontopou...@lightbend.com>
Authored: Mon Dec 3 09:02:47 2018 -0800
Committer: Yinan Li <y...@google.com>
Committed: Mon Dec 3 09:02:47 2018 -0800

----------------------------------------------------------------------
 docs/running-on-kubernetes.md                        |  7 +++++++
 .../scala/org/apache/spark/deploy/k8s/Config.scala   |  7 +++++++
 .../cluster/k8s/ExecutorPodsAllocator.scala          | 15 ++++++++++-----
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala   |  8 ++++++--
 .../k8s/KubernetesClusterSchedulerBackend.scala      | 15 ++++++++++-----
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala      | 14 +++++++++++++-
 6 files changed, 53 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/docs/running-on-kubernetes.md
----------------------------------------------------------------------
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index 5639253..3172b1b 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -944,6 +944,13 @@ specific to Spark on Kubernetes.
    
<code>spark.kubernetes.executor.podTemplateFile=/path/to/executor-pod-template.yaml`</code>
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.executor.deleteOnTermination</code></td>
+  <td>true</td>
+  <td>
+  Specify whether executor pods should be deleted in case of failure or normal 
termination.
+  </td>
+</tr>
 </table>
 
 #### Pod template properties

http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 1abf290..e8bf16d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -282,6 +282,13 @@ private[spark] object Config extends Logging {
 
   val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
 
+  val KUBERNETES_DELETE_EXECUTORS =
+    ConfigBuilder("spark.kubernetes.executor.deleteOnTermination")
+      .doc("If set to false then executor pods will not be deleted in case " +
+        "of failure or normal termination.")
+      .booleanConf
+      .createWithDefault(true)
+
   val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
   val KUBERNETES_DRIVER_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.annotation."
   val KUBERNETES_DRIVER_SECRETS_PREFIX = "spark.kubernetes.driver.secrets."

http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 77bb9c3..ef4cbdf 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -51,6 +51,8 @@ private[spark] class ExecutorPodsAllocator(
   private val kubernetesDriverPodName = conf
     .get(KUBERNETES_DRIVER_POD_NAME)
 
+  private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
+
   private val driverPod = kubernetesDriverPodName
     .map(name => Option(kubernetesClient.pods()
       .withName(name)
@@ -86,11 +88,14 @@ private[spark] class ExecutorPodsAllocator(
           s" cluster after $podCreationTimeout milliseconds despite the fact 
that a" +
           " previous allocation attempt tried to create it. The executor may 
have been" +
           " deleted but the application missed the deletion event.")
-        Utils.tryLogNonFatalError {
-          kubernetesClient
-            .pods()
-            .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
-            .delete()
+
+        if (shouldDeleteExecutors) {
+          Utils.tryLogNonFatalError {
+            kubernetesClient
+              .pods()
+              .withLabel(SPARK_EXECUTOR_ID_LABEL, execId.toString)
+              .delete()
+          }
         }
         newlyCreatedExecutors -= execId
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 77a1d6c..95e1ba8 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -30,7 +30,7 @@ import org.apache.spark.scheduler.ExecutorExited
 import org.apache.spark.util.Utils
 
 private[spark] class ExecutorPodsLifecycleManager(
-    conf: SparkConf,
+    val conf: SparkConf,
     kubernetesClient: KubernetesClient,
     snapshotsStore: ExecutorPodsSnapshotsStore,
     // Use a best-effort to track which executors have been removed already. 
It's not generally
@@ -43,6 +43,8 @@ private[spark] class ExecutorPodsLifecycleManager(
 
   private val eventProcessingInterval = 
conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
 
+  private lazy val shouldDeleteExecutors = 
conf.get(KUBERNETES_DELETE_EXECUTORS)
+
   def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
     snapshotsStore.addSubscriber(eventProcessingInterval) {
       onNewSnapshots(schedulerBackend, _)
@@ -112,7 +114,9 @@ private[spark] class ExecutorPodsLifecycleManager(
       schedulerBackend: KubernetesClusterSchedulerBackend,
       execIdsRemovedInRound: mutable.Set[Long]): Unit = {
     removeExecutorFromSpark(schedulerBackend, podState, execId)
-    removeExecutorFromK8s(podState.pod)
+    if (shouldDeleteExecutors) {
+      removeExecutorFromK8s(podState.pod)
+    }
     execIdsRemovedInRound += execId
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index fa6dc2c..6356b58 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.ExecutorService
 import io.fabric8.kubernetes.client.KubernetesClient
 import scala.concurrent.{ExecutionContext, Future}
 
+import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.rpc.{RpcAddress, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
@@ -51,6 +52,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   private val initialExecutors = 
SchedulerBackendUtils.getInitialTargetExecutorNumber(conf)
 
+  private val shouldDeleteExecutors = conf.get(KUBERNETES_DELETE_EXECUTORS)
+
   // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
   private[k8s] def doRemoveExecutor(executorId: String, reason: 
ExecutorLossReason): Unit = {
     removeExecutor(executorId, reason)
@@ -82,11 +85,13 @@ private[spark] class KubernetesClusterSchedulerBackend(
       pollEvents.stop()
     }
 
-    Utils.tryLogNonFatalError {
-      kubernetesClient.pods()
-        .withLabel(SPARK_APP_ID_LABEL, applicationId())
-        .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
-        .delete()
+    if (shouldDeleteExecutors) {
+      Utils.tryLogNonFatalError {
+        kubernetesClient.pods()
+          .withLabel(SPARK_APP_ID_LABEL, applicationId())
+          .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+          .delete()
+      }
     }
 
     Utils.tryLogNonFatalError {

http://git-wip-us.apache.org/repos/asf/spark/blob/0c2935b0/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index 3995b2a..7411f8f 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -22,7 +22,7 @@ import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
 import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.Matchers.any
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{mock, never, times, verify, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.scalatest.BeforeAndAfter
@@ -30,6 +30,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Config
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.deploy.k8s.KubernetesUtils._
 import org.apache.spark.scheduler.ExecutorExited
@@ -100,6 +101,17 @@ class ExecutorPodsLifecycleManagerSuite extends 
SparkFunSuite with BeforeAndAfte
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
   }
 
+  test("Keep executor pods in k8s if configured.") {
+    val failedPod = failedExecutorWithoutDeletion(1)
+    eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
+    snapshotsStore.updatePod(failedPod)
+    snapshotsStore.notifySubscribers()
+    val msg = exitReasonMessage(1, failedPod)
+    val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
+    verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
+    verify(podOperations, never()).delete()
+  }
+
   private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String 
= {
     val reason = Option(failedPod.getStatus.getReason)
     val message = Option(failedPod.getStatus.getMessage)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to