This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ff3f3c4  [SPARK-36058][K8S] Add support for statefulset APIs in K8s
ff3f3c4 is described below

commit ff3f3c45668364da9bd10992791b5ee9a46fea21
Author: Holden Karau <hka...@netflix.com>
AuthorDate: Wed Aug 25 17:38:57 2021 -0700

    [SPARK-36058][K8S] Add support for statefulset APIs in K8s
    
    ### What changes were proposed in this pull request?
    
    Generalize the pod allocator and add support for statefulsets.
    
    ### Why are the changes needed?
    
    Allocating individual pods in Spark can be not ideal for some clusters and 
using higher level operators like statefulsets and replicasets can be useful.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes new config options.
    
    ### How was this patch tested?
    
    Completed: New unit & basic integration test
    PV integration tests
    
    Closes #33508 from 
holdenk/SPARK-36058-support-replicasets-or-job-api-like-things.
    
    Lead-authored-by: Holden Karau <hka...@netflix.com>
    Co-authored-by: Holden Karau <hol...@pigscanfly.ca>
    Signed-off-by: Holden Karau <hka...@netflix.com>
---
 .../executor/CoarseGrainedExecutorBackend.scala    |   2 +-
 .../scala/org/apache/spark/storage/DiskStore.scala |   9 +-
 .../apache/spark/examples/MiniReadWriteTest.scala  | 139 +++++++++++++
 .../scala/org/apache/spark/deploy/k8s/Config.scala |  10 +
 .../org/apache/spark/deploy/k8s/Constants.scala    |   1 +
 .../k8s/features/BasicExecutorFeatureStep.scala    |  15 +-
 .../cluster/k8s/AbstractPodsAllocator.scala        |  59 ++++++
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  18 +-
 .../cluster/k8s/ExecutorPodsSnapshot.scala         |  11 +-
 .../cluster/k8s/KubernetesClusterManager.scala     |  38 +++-
 .../cluster/k8s/KubernetesClusterMessage.scala     |  21 ++
 .../k8s/KubernetesClusterSchedulerBackend.scala    |  41 +++-
 .../cluster/k8s/KubernetesExecutorBackend.scala    | 228 +++++++++++++++++++++
 .../cluster/k8s/StatefulsetPodsAllocator.scala     | 201 ++++++++++++++++++
 .../features/BasicExecutorFeatureStepSuite.scala   |   6 +-
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   |  31 ++-
 .../k8s/KubernetesClusterManagerSuite.scala        |  58 ++++++
 .../KubernetesClusterSchedulerBackendSuite.scala   |  13 +-
 .../cluster/k8s/StatefulsetAllocatorSuite.scala    | 153 ++++++++++++++
 .../src/main/dockerfiles/spark/entrypoint.sh       |   3 +-
 .../k8s/integrationtest/BasicTestsSuite.scala      |  17 ++
 .../k8s/integrationtest/KubernetesSuite.scala      |  33 ++-
 .../deploy/k8s/integrationtest/PVTestsSuite.scala  |  88 +++++++-
 23 files changed, 1148 insertions(+), 47 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 76d01f8..c87e61a 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -61,7 +61,7 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   private implicit val formats = DefaultFormats
 
-  private[executor] val stopping = new AtomicBoolean(false)
+  private[spark] val stopping = new AtomicBoolean(false)
   var executor: Executor = null
   @volatile var driver: Option[RpcEndpointRef] = None
 
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index 7269913..f0334c5 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -59,7 +59,14 @@ private[spark] class DiskStore(
    */
   def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
     if (contains(blockId)) {
-      throw new IllegalStateException(s"Block $blockId is already present in 
the disk store")
+      logWarning(s"Block $blockId is already present in the disk store")
+      try {
+        diskManager.getFile(blockId).delete()
+      } catch {
+        case e: Exception =>
+          throw new IllegalStateException(
+            s"Block $blockId is already present in the disk store and could 
not delete it $e")
+      }
     }
     logDebug(s"Attempting to put block $blockId")
     val startTimeNs = System.nanoTime()
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
new file mode 100644
index 0000000..5a74e1c
--- /dev/null
+++ b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// scalastyle:off println
+package org.apache.spark.examples
+
+import java.io.File
+import java.io.PrintWriter
+
+import scala.io.Source._
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.util.Utils
+
+/**
+ * Simple test for reading and writing to a distributed
+ * file system.  This example does the following:
+ *
+ *   1. Reads local file
+ *   2. Computes word count on local file
+ *   3. Writes local file to a local dir on each executor
+ *   4. Reads the file back from each exec
+ *   5. Computes word count on the file using Spark
+ *   6. Compares the word count results
+ */
+object MiniReadWriteTest {
+
+  private val NPARAMS = 1
+
+  private def readFile(filename: String): List[String] = {
+    Utils.tryWithResource(fromFile(filename))(_.getLines().toList)
+  }
+
+  private def printUsage(): Unit = {
+    val usage = """Mini Read-Write Test
+    |Usage: localFile
+    |localFile - (string) location of local file to distribute to 
executors.""".stripMargin
+
+    println(usage)
+  }
+
+  private def parseArgs(args: Array[String]): File = {
+    if (args.length != NPARAMS) {
+      printUsage()
+      System.exit(1)
+    }
+
+    var i = 0
+
+    val localFilePath = new File(args(i))
+    if (!localFilePath.exists) {
+      System.err.println(s"Given path (${args(i)}) does not exist")
+      printUsage()
+      System.exit(1)
+    }
+
+    if (!localFilePath.isFile) {
+      System.err.println(s"Given path (${args(i)}) is not a file")
+      printUsage()
+      System.exit(1)
+    }
+    localFilePath
+  }
+
+  def runLocalWordCount(fileContents: List[String]): Int = {
+    fileContents.flatMap(_.split(" "))
+      .flatMap(_.split("\t"))
+      .filter(_.nonEmpty)
+      .groupBy(w => w)
+      .mapValues(_.size)
+      .values
+      .sum
+  }
+
+  def main(args: Array[String]): Unit = {
+    val localFilePath = parseArgs(args)
+
+    println(s"Performing local word count from ${localFilePath}")
+    val fileContents = readFile(localFilePath.toString())
+    println(s"File contents are ${fileContents}")
+    val localWordCount = runLocalWordCount(fileContents)
+
+    println("Creating SparkSession")
+    val spark = SparkSession
+      .builder
+      .appName("Mini Read Write Test")
+      .getOrCreate()
+
+    println("Writing local file to executors")
+
+    // uses the fact default parallelism is greater than num execs
+    val misc = spark.sparkContext.parallelize(1.to(10))
+    misc.foreachPartition {
+      x =>
+        new PrintWriter(localFilePath) {
+          try {
+            write(fileContents.mkString("\n"))
+          } finally {
+            close()
+          }}
+    }
+
+    println("Reading file from execs and running Word Count")
+    val readFileRDD = spark.sparkContext.textFile(localFilePath.toString())
+
+    val dWordCount = readFileRDD
+      .flatMap(_.split(" "))
+      .flatMap(_.split("\t"))
+      .filter(_.nonEmpty)
+      .map(w => (w, 1))
+      .countByKey()
+      .values
+      .sum
+
+    spark.stop()
+    if (localWordCount == dWordCount) {
+      println(s"Success! Local Word Count $localWordCount and " +
+        s"D Word Count $dWordCount agree.")
+    } else {
+      println(s"Failure! Local Word Count $localWordCount " +
+        s"and D Word Count $dWordCount disagree.")
+    }
+  }
+}
+// scalastyle:on println
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 2c4adee..2aa4fbc 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -323,6 +323,16 @@ private[spark] object Config extends Logging {
       .stringConf
       .createOptional
 
+  val KUBERNETES_ALLOCATION_PODS_ALLOCATOR =
+    ConfigBuilder("spark.kubernetes.allocation.pods.allocator")
+      .doc("Allocator to use for pods. Possible values are direct (the 
default) and statefulset " +
+        ", or a full class name of a class implementing AbstractPodsAllocator. 
" +
+        "Future version may add Job or replicaset. This is a developer API and 
may change " +
+      "or be removed at anytime.")
+      .version("3.3.0")
+      .stringConf
+      .createWithDefault("direct")
+
   val KUBERNETES_ALLOCATION_BATCH_SIZE =
     ConfigBuilder("spark.kubernetes.allocation.batch.size")
       .doc("Number of pods to launch at once in each round of executor 
allocation.")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
index 543ca12..f4b362b 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala
@@ -59,6 +59,7 @@ private[spark] object Constants {
   val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID"
   val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID"
   val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP"
+  val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME"
   val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_"
   val ENV_CLASSPATH = "SPARK_CLASSPATH"
   val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS"
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
index a90d2ab..4d56468 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala
@@ -139,6 +139,13 @@ private[spark] class BasicExecutorFeatureStep(
             .build())
           .build())
       } ++ {
+        Seq(new EnvVarBuilder()
+          .withName(ENV_EXECUTOR_POD_NAME)
+          .withValueFrom(new EnvVarSourceBuilder()
+            .withNewFieldRef("v1", "metadata.name")
+            .build())
+          .build())
+      } ++ {
         if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) {
           Option(secMgr.getSecretKey()).map { authSecret =>
             new EnvVarBuilder()
@@ -260,16 +267,22 @@ private[spark] class BasicExecutorFeatureStep(
         .withUid(pod.getMetadata.getUid)
         .build()
     }
+
+    val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) 
match {
+      case "statefulset" => "Always"
+      case _ => "Never"
+    }
     val executorPodBuilder = new PodBuilder(pod.pod)
       .editOrNewMetadata()
         .withName(name)
         .addToLabels(kubernetesConf.labels.asJava)
+        .addToLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, 
resourceProfile.id.toString)
         .addToAnnotations(kubernetesConf.annotations.asJava)
         .addToOwnerReferences(ownerReference.toSeq: _*)
         .endMetadata()
       .editOrNewSpec()
         .withHostname(hostname)
-        .withRestartPolicy("Never")
+        .withRestartPolicy(policy)
         .addToNodeSelector(kubernetesConf.nodeSelector.asJava)
         .addToNodeSelector(kubernetesConf.executorNodeSelector.asJava)
         .addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*)
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala
new file mode 100644
index 0000000..2e0d4fa
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model.Pod
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.resource.ResourceProfile
+
+
+/**
+ * :: DeveloperApi ::
+ * A abstract interface for allowing different types of pods allocation.
+ *
+ * The internal Spark implementations are [[StatefulsetPodsAllocator]]
+ * and [[ExecutorPodsAllocator]]. This may be useful for folks integrating 
with custom schedulers
+ * such as Volcano, Yunikorn, etc.
+ *
+ * This API may change or be removed at anytime.
+ *
+ * @since 3.3.0
+ */
+@DeveloperApi
+abstract class AbstractPodsAllocator {
+  /*
+   * Set the total expected executors for an application
+   */
+  def setTotalExpectedExecutors(resourceProfileToTotalExecs: 
Map[ResourceProfile, Int]): Unit
+  /*
+   * Reference to driver pod.
+   */
+  def driverPod: Option[Pod]
+  /*
+   * If the pod for a given exec id is deleted.
+   */
+  def isDeleted(executorId: String): Boolean
+  /*
+   * Start hook.
+   */
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit
+  /*
+   * Stop hook
+   */
+  def stop(applicationId: String): Unit
+}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index cee5360..043f0e9 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -37,13 +37,13 @@ import 
org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.util.{Clock, Utils}
 
-private[spark] class ExecutorPodsAllocator(
+class ExecutorPodsAllocator(
     conf: SparkConf,
     secMgr: SecurityManager,
     executorBuilder: KubernetesExecutorBuilder,
     kubernetesClient: KubernetesClient,
     snapshotsStore: ExecutorPodsSnapshotsStore,
-    clock: Clock) extends Logging {
+    clock: Clock) extends AbstractPodsAllocator() with Logging {
 
   private val EXECUTOR_ID_COUNTER = new AtomicInteger(0)
 
@@ -97,12 +97,15 @@ private[spark] class ExecutorPodsAllocator(
 
   private var lastSnapshot = ExecutorPodsSnapshot()
 
+  private var appId: String = _
+
   // Executors that have been deleted by this allocator but not yet detected 
as deleted in
   // a snapshot from the API server. This is used to deny registration from 
these executors
   // if they happen to come up before the deletion takes effect.
   @volatile private var deletedExecutorIds = Set.empty[Long]
 
   def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+    appId = applicationId
     driverPod.foreach { pod =>
       // Wait until the driver pod is ready before starting executors, as the 
headless service won't
       // be resolvable by DNS until the driver pod is ready.
@@ -461,6 +464,16 @@ private[spark] class ExecutorPodsAllocator(
         true
     }
   }
+
+  override def stop(applicationId: String): Unit = {
+    Utils.tryLogNonFatalError {
+      kubernetesClient
+        .pods()
+        .withLabel(SPARK_APP_ID_LABEL, applicationId)
+        .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+        .delete()
+    }
+  }
 }
 
 private[spark] object ExecutorPodsAllocator {
@@ -471,5 +484,4 @@ private[spark] object ExecutorPodsAllocator {
     val r = slots % consumers.size
     consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - 
r).map((_, d))
   }
-
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
index 76c17cf..ff47c17 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala
@@ -60,8 +60,15 @@ object ExecutorPodsSnapshot extends Logging {
   }
 
   private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, 
ExecutorPodState] = {
-    executorPods.map { pod =>
-      (pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, 
toState(pod))
+    executorPods.flatMap { pod =>
+      pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) match {
+        case "EXECID" | null =>
+          // The exec label has not yet been assigned
+          None
+        case id =>
+          // We have a "real" id label
+          Some((id.toLong, toState(pod)))
+      }
     }.toMap
   }
 
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 4d28d38..9497349 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -19,14 +19,15 @@ package org.apache.spark.scheduler.cluster.k8s
 import java.io.File
 
 import io.fabric8.kubernetes.client.Config
+import io.fabric8.kubernetes.client.KubernetesClient
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, 
SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, 
TaskScheduler, TaskSchedulerImpl}
-import org.apache.spark.util.{SystemClock, ThreadUtils}
+import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
 
 private[spark] class KubernetesClusterManager extends ExternalClusterManager 
with Logging {
 
@@ -109,13 +110,7 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       kubernetesClient,
       snapshotsStore)
 
-    val executorPodsAllocator = new ExecutorPodsAllocator(
-      sc.conf,
-      sc.env.securityManager,
-      new KubernetesExecutorBuilder(),
-      kubernetesClient,
-      snapshotsStore,
-      new SystemClock())
+    val executorPodsAllocator = makeExecutorPodsAllocator(sc, 
kubernetesClient, snapshotsStore)
 
     val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
       snapshotsStore,
@@ -138,6 +133,31 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       podsPollingEventSource)
   }
 
+  private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, 
kubernetesClient: KubernetesClient,
+      snapshotsStore: ExecutorPodsSnapshotsStore) = {
+    val executorPodsAllocatorName = 
sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
+      case "statefulset" =>
+        classOf[StatefulsetPodsAllocator].getName
+      case "direct" =>
+        classOf[ExecutorPodsAllocator].getName
+      case fullClass =>
+        fullClass
+    }
+
+    val cls = 
Utils.classForName[AbstractPodsAllocator](executorPodsAllocatorName)
+    val cstr = cls.getConstructor(
+      classOf[SparkConf], classOf[org.apache.spark.SecurityManager],
+      classOf[KubernetesExecutorBuilder], classOf[KubernetesClient],
+      classOf[ExecutorPodsSnapshotsStore], classOf[Clock])
+    cstr.newInstance(
+      sc.conf,
+      sc.env.securityManager,
+      new KubernetesExecutorBuilder(),
+      kubernetesClient,
+      snapshotsStore,
+      new SystemClock())
+  }
+
   override def initialize(scheduler: TaskScheduler, backend: 
SchedulerBackend): Unit = {
     scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
   }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterMessage.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterMessage.scala
new file mode 100644
index 0000000..4a8dae6
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterMessage.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.io.Serializable
+
+case class GenerateExecID(podName: String) extends Serializable
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 25f6851..d8e97e1 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 import scala.concurrent.Future
@@ -46,7 +47,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     kubernetesClient: KubernetesClient,
     executorService: ScheduledExecutorService,
     snapshotsStore: ExecutorPodsSnapshotsStore,
-    podAllocator: ExecutorPodsAllocator,
+    podAllocator: AbstractPodsAllocator,
     lifecycleEventHandler: ExecutorPodsLifecycleManager,
     watchEvents: ExecutorPodsWatchSnapshotSource,
     pollEvents: ExecutorPodsPollingSnapshotSource)
@@ -97,10 +98,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   override def start(): Unit = {
     super.start()
+    // Must be called before setting the executors
+    podAllocator.start(applicationId(), this)
     val initExecs = Map(defaultProfile -> initialExecutors)
     podAllocator.setTotalExpectedExecutors(initExecs)
     lifecycleEventHandler.start(this)
-    podAllocator.start(applicationId(), this)
     watchEvents.start(applicationId())
     pollEvents.start(applicationId())
     if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) {
@@ -144,13 +146,9 @@ private[spark] class KubernetesClusterSchedulerBackend(
     }
 
     if (shouldDeleteExecutors) {
-      Utils.tryLogNonFatalError {
-        kubernetesClient
-          .pods()
-          .withLabel(SPARK_APP_ID_LABEL, applicationId())
-          .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
-          .delete()
-      }
+
+      podAllocator.stop(applicationId())
+
       if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) {
         Utils.tryLogNonFatalError {
           kubernetesClient
@@ -278,6 +276,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
     new KubernetesDriverEndpoint()
   }
 
+  val execId = new AtomicInteger(0)
+
   override protected def createTokenManager(): 
Option[HadoopDelegationTokenManager] = {
     Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, 
driverEndpoint))
   }
@@ -287,12 +287,33 @@ private[spark] class KubernetesClusterSchedulerBackend(
   }
 
   private class KubernetesDriverEndpoint extends DriverEndpoint {
+    private def generateExecID(context: RpcCallContext): PartialFunction[Any, 
Unit] = {
+      case x: GenerateExecID =>
+        val newId = execId.incrementAndGet().toString
+        context.reply(newId)
+        // Generally this should complete quickly but safer to not block 
in-case we're in the
+        // middle of an etcd fail over or otherwise slower writes.
+        val labelTask = new Runnable() {
+          override def run(): Unit = Utils.tryLogNonFatalError {
+            // Label the pod with it's exec ID
+            kubernetesClient.pods()
+              .withName(x.podName)
+              .edit({p: Pod => new PodBuilder(p).editMetadata()
+                .addToLabels(SPARK_EXECUTOR_ID_LABEL, newId.toString)
+                .endMetadata()
+                .build()})
+          }
+        }
+        executorService.execute(labelTask)
+    }
     private def ignoreRegisterExecutorAtStoppedContext: PartialFunction[Any, 
Unit] = {
       case _: RegisterExecutor if sc.isStopped => // No-op
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] =
-      
ignoreRegisterExecutorAtStoppedContext.orElse(super.receiveAndReply(context))
+      generateExecID(context).orElse(
+        ignoreRegisterExecutorAtStoppedContext.orElse(
+          super.receiveAndReply(context)))
 
     override def onDisconnected(rpcAddress: RpcAddress): Unit = {
       // Don't do anything besides disabling the executor - allow the 
Kubernetes API events to
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
new file mode 100644
index 0000000..dd06688
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import org.apache.spark._
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.worker.WorkerWatcher
+import org.apache.spark.executor.CoarseGrainedExecutorBackend
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
+import org.apache.spark.rpc._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.util.Utils
+
+private[spark] object KubernetesExecutorBackend extends Logging {
+
+  // Message used internally to start the executor when the driver 
successfully accepted the
+  // registration request.
+  case object RegisteredExecutor
+
+  case class Arguments(
+      driverUrl: String,
+      executorId: String,
+      bindAddress: String,
+      hostname: String,
+      cores: Int,
+      appId: String,
+      workerUrl: Option[String],
+      resourcesFileOpt: Option[String],
+      resourceProfileId: Int,
+      podName: String)
+
+  def main(args: Array[String]): Unit = {
+    val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) =>
+      CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, 
resourceProfile, execId) =>
+        new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId,
+        arguments.bindAddress, arguments.hostname, arguments.cores,
+        env, arguments.resourcesFileOpt, resourceProfile)
+    }
+    run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), 
createFn)
+    System.exit(0)
+  }
+
+  def run(
+      arguments: Arguments,
+      backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) 
=>
+        CoarseGrainedExecutorBackend): Unit = {
+
+    Utils.initDaemon(log)
+
+    SparkHadoopUtil.get.runAsSparkUser { () =>
+      // Debug code
+      Utils.checkHost(arguments.hostname)
+
+      // Bootstrap to fetch the driver's Spark properties.
+      val executorConf = new SparkConf
+      val fetcher = RpcEnv.create(
+        "driverPropsFetcher",
+        arguments.bindAddress,
+        arguments.hostname,
+        -1,
+        executorConf,
+        new SecurityManager(executorConf),
+        numUsableCores = 0,
+        clientMode = true)
+
+      var driver: RpcEndpointRef = null
+      val nTries = 3
+      for (i <- 0 until nTries if driver == null) {
+        try {
+          driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
+        } catch {
+          case e: Throwable => if (i == nTries - 1) {
+            throw e
+          }
+        }
+      }
+
+      val cfg = 
driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId))
+      val props = cfg.sparkProperties ++ Seq[(String, 
String)](("spark.app.id", arguments.appId))
+      val execId: String = arguments.executorId match {
+        case null | "EXECID" | "" =>
+          // We need to resolve the exec id dynamically
+          driver.askSync[String](GenerateExecID(arguments.podName))
+        case id =>
+          id
+      }
+      fetcher.shutdown()
+
+      // Create SparkEnv using properties we fetched from the driver.
+      val driverConf = new SparkConf()
+      for ((key, value) <- props) {
+        // this is required for SSL in standalone mode
+        if (SparkConf.isExecutorStartupConf(key)) {
+          driverConf.setIfMissing(key, value)
+        } else {
+          driverConf.set(key, value)
+        }
+      }
+
+      cfg.hadoopDelegationCreds.foreach { tokens =>
+        SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf)
+      }
+
+      driverConf.set(EXECUTOR_ID, execId)
+      val env = SparkEnv.createExecutorEnv(driverConf, execId, 
arguments.bindAddress,
+        arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = 
false)
+
+      val backend = backendCreateFn(env.rpcEnv, arguments, env, 
cfg.resourceProfile, execId)
+      env.rpcEnv.setupEndpoint("Executor", backend)
+      arguments.workerUrl.foreach { url =>
+        env.rpcEnv.setupEndpoint("WorkerWatcher",
+          new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = 
backend.stopping))
+      }
+      env.rpcEnv.awaitTermination()
+    }
+  }
+
+  def parseArguments(args: Array[String], classNameForEntry: String): 
Arguments = {
+    var driverUrl: String = null
+    var executorId: String = null
+    var bindAddress: String = null
+    var hostname: String = null
+    var cores: Int = 0
+    var resourcesFileOpt: Option[String] = None
+    var appId: String = null
+    var workerUrl: Option[String] = None
+    var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID
+    var podName: String = null
+
+    var argv = args.toList
+    while (!argv.isEmpty) {
+      argv match {
+        case ("--driver-url") :: value :: tail =>
+          driverUrl = value
+          argv = tail
+        case ("--executor-id") :: value :: tail =>
+          executorId = value
+          argv = tail
+        case ("--bind-address") :: value :: tail =>
+          bindAddress = value
+          argv = tail
+        case ("--hostname") :: value :: tail =>
+          hostname = value
+          argv = tail
+        case ("--cores") :: value :: tail =>
+          cores = value.toInt
+          argv = tail
+        case ("--resourcesFile") :: value :: tail =>
+          resourcesFileOpt = Some(value)
+          argv = tail
+        case ("--app-id") :: value :: tail =>
+          appId = value
+          argv = tail
+        case ("--worker-url") :: value :: tail =>
+          // Worker url is used in spark standalone mode to enforce 
fate-sharing with worker
+          workerUrl = Some(value)
+          argv = tail
+        case ("--resourceProfileId") :: value :: tail =>
+          resourceProfileId = value.toInt
+          argv = tail
+        case ("--podName") :: value :: tail =>
+          podName = value
+          argv = tail
+        case Nil =>
+        case tail =>
+          // scalastyle:off println
+          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
+          // scalastyle:on println
+          printUsageAndExit(classNameForEntry)
+      }
+    }
+
+    if (hostname == null) {
+      hostname = Utils.localHostName()
+      log.info(s"Executor hostname is not provided, will use '$hostname' to 
advertise itself")
+    }
+
+    if (driverUrl == null || executorId == null || cores <= 0 || appId == 
null) {
+      printUsageAndExit(classNameForEntry)
+    }
+
+    if (bindAddress == null) {
+      bindAddress = hostname
+    }
+
+    Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, 
workerUrl,
+      resourcesFileOpt, resourceProfileId, podName)
+  }
+
+  private def printUsageAndExit(classNameForEntry: String): Unit = {
+    // scalastyle:off println
+    System.err.println(
+      s"""
+      |Usage: $classNameForEntry [options]
+      |
+      | Options are:
+      |   --driver-url <driverUrl>
+      |   --executor-id <executorId>
+      |   --bind-address <bindAddress>
+      |   --hostname <hostname>
+      |   --cores <cores>
+      |   --resourcesFile <fileWithJSONResourceInformation>
+      |   --app-id <appid>
+      |   --worker-url <workerUrl>
+      |   --resourceProfileId <id>
+      |   --podName <podName>
+      |""".stripMargin)
+    // scalastyle:on println
+    System.exit(1)
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetPodsAllocator.scala
new file mode 100644
index 0000000..0d00d96
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetPodsAllocator.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model.{PersistentVolumeClaim,
+  PersistentVolumeClaimBuilder, PodSpec, PodSpecBuilder, PodTemplateSpec}
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesConf
+import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference
+import org.apache.spark.internal.Logging
+import org.apache.spark.resource.ResourceProfile
+import org.apache.spark.util.{Clock, Utils}
+
+class StatefulsetPodsAllocator(
+    conf: SparkConf,
+    secMgr: SecurityManager,
+    executorBuilder: KubernetesExecutorBuilder,
+    kubernetesClient: KubernetesClient,
+    snapshotsStore: ExecutorPodsSnapshotsStore,
+    clock: Clock) extends AbstractPodsAllocator() with Logging {
+
+  private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile]
+
+  private val driverPodReadinessTimeout = 
conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT)
+
+  private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
+  private val kubernetesDriverPodName = conf
+    .get(KUBERNETES_DRIVER_POD_NAME)
+
+  val driverPod = kubernetesDriverPodName
+    .map(name => Option(kubernetesClient.pods()
+      .withName(name)
+      .get())
+      .getOrElse(throw new SparkException(
+        s"No pod was found named $name in the cluster in the " +
+          s"namespace $namespace (this was supposed to be the driver pod.).")))
+
+  private var appId: String = _
+
+  def start(applicationId: String, schedulerBackend: 
KubernetesClusterSchedulerBackend): Unit = {
+    appId = applicationId
+    driverPod.foreach { pod =>
+      // Wait until the driver pod is ready before starting executors, as the 
headless service won't
+      // be resolvable by DNS until the driver pod is ready.
+      Utils.tryLogNonFatalError {
+        kubernetesClient
+          .pods()
+          .withName(pod.getMetadata.getName)
+          .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
+      }
+    }
+  }
+
+  def setTotalExpectedExecutors(resourceProfileToTotalExecs: 
Map[ResourceProfile, Int]): Unit = {
+    if (appId == null) {
+      throw new SparkException("setTotalExpectedExecutors called before start 
of allocator.")
+    }
+    resourceProfileToTotalExecs.foreach { case (rp, numExecs) =>
+      rpIdToResourceProfile.getOrElseUpdate(rp.id, rp)
+      setTargetExecutorsReplicaset(numExecs, appId, rp.id)
+    }
+  }
+
+  def isDeleted(executorId: String): Boolean = false
+
+  // For now just track the sets created, in the future maybe track requested 
value too.
+  val setsCreated = new mutable.HashSet[Int]()
+
+  private def setName(applicationId: String, rpid: Int): String = {
+    s"spark-s-${applicationId}-${rpid}"
+  }
+
+  private def setTargetExecutorsReplicaset(
+      expected: Int,
+      applicationId: String,
+      resourceProfileId: Int): Unit = {
+    if (setsCreated.contains(resourceProfileId)) {
+      val statefulset = kubernetesClient.apps().statefulSets().withName(
+        setName(applicationId, resourceProfileId: Int))
+      statefulset.scale(expected, false /* wait */)
+    } else {
+      // We need to make the new replicaset which is going to involve building
+      // a pod.
+      val executorConf = KubernetesConf.createExecutorConf(
+        conf,
+        "EXECID",// template exec IDs
+        applicationId,
+        driverPod,
+        resourceProfileId)
+      val resolvedExecutorSpec = 
executorBuilder.buildFromFeatures(executorConf, secMgr,
+        kubernetesClient, rpIdToResourceProfile(resourceProfileId))
+      val executorPod = resolvedExecutorSpec.pod
+      val podSpecBuilder = executorPod.pod.getSpec() match {
+        case null => new PodSpecBuilder()
+        case s => new PodSpecBuilder(s)
+      }
+      val podWithAttachedContainer: PodSpec = podSpecBuilder
+        .addToContainers(executorPod.container)
+        .build()
+
+      val meta = executorPod.pod.getMetadata()
+
+      // Resources that need to be created, volumes are per-pod which is all 
we care about here.
+      val resources = resolvedExecutorSpec.executorKubernetesResources
+      // We'll let PVCs be handled by the statefulset. Note user is 
responsible for
+      // cleaning up PVCs. Future work: integrate with KEP1847 once stabilized.
+      val dynamicVolumeClaims = resources.filter(_.getKind == 
"PersistentVolumeClaim")
+        .map(_.asInstanceOf[PersistentVolumeClaim])
+      // Remove the dynamic volumes from our pod
+      val dynamicVolumeClaimNames: Set[String] = 
dynamicVolumeClaims.map(_.getMetadata().getName())
+        .toSet
+      val podVolumes = podWithAttachedContainer.getVolumes().asScala
+      val staticVolumes = podVolumes.filter { v =>
+        val pvc = v.getPersistentVolumeClaim()
+        pvc match {
+          case null => true
+          case _ =>
+            !dynamicVolumeClaimNames.contains(pvc.getClaimName())
+        }
+      }
+      val dynamicClaimToVolumeName = podVolumes.filter { v =>
+        val pvc = v.getPersistentVolumeClaim()
+        pvc match {
+          case null => false
+          case _ =>
+            dynamicVolumeClaimNames.contains(pvc.getClaimName())
+        }
+      }.map { v =>
+        (v.getPersistentVolumeClaim().getClaimName(), v.getName())
+      }.toMap
+      // This just mutates it. Java style API
+      podWithAttachedContainer.setVolumes(staticVolumes.asJava)
+      // Rewrite the dynamic volume names to not ref our fake EXECID.
+      val newNamedVolumes = dynamicVolumeClaims.zipWithIndex.map { case (v, i) 
=>
+        new PersistentVolumeClaimBuilder(v)
+          .editMetadata()
+            
.withName(dynamicClaimToVolumeName.get(v.getMetadata().getName()).get)
+          .endMetadata()
+          .build()
+      }
+
+      // Create a pod template spec from the pod.
+      val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer)
+
+      val statefulSet = new 
io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder()
+        .withNewMetadata()
+          .withName(setName(applicationId, resourceProfileId))
+          .withNamespace(conf.get(KUBERNETES_NAMESPACE))
+        .endMetadata()
+        .withNewSpec()
+          .withPodManagementPolicy("Parallel")
+          .withReplicas(expected)
+          .withNewSelector()
+            .addToMatchLabels(SPARK_APP_ID_LABEL, applicationId)
+            .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
+            .addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, 
resourceProfileId.toString)
+          .endSelector()
+          .withTemplate(podTemplateSpec)
+          .addAllToVolumeClaimTemplates(newNamedVolumes.asJava)
+        .endSpec()
+        .build()
+
+      addOwnerReference(driverPod.get, Seq(statefulSet))
+      kubernetesClient.apps().statefulSets().create(statefulSet)
+      setsCreated += (resourceProfileId)
+    }
+  }
+
+  override def stop(applicationId: String): Unit = {
+    // Cleanup the statefulsets when we stop
+    setsCreated.foreach { rpid =>
+      Utils.tryLogNonFatalError {
+        kubernetesClient.apps().statefulSets().withName(setName(applicationId, 
rpid)).delete()
+      }
+    }
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
index 363029e..e59772d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala
@@ -455,9 +455,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite 
with BeforeAndAfter {
       ENV_EXECUTOR_MEMORY -> "1024m",
       ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID,
       ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
-      ENV_EXECUTOR_POD_IP -> null,
       ENV_SPARK_USER -> Utils.getCurrentUserName(),
-      ENV_RESOURCE_PROFILE_ID -> "0")
+      ENV_RESOURCE_PROFILE_ID -> "0",
+      // These are populated by K8s on scheduling
+      ENV_EXECUTOR_POD_IP -> null,
+      ENV_EXECUTOR_POD_NAME -> null)
 
     val extraJavaOptsStart = 
additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX))
     val extraJavaOpts = Utils.sparkJavaOpts(conf, 
SparkConf.isExecutorStartupConf)
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 5b33da6..d263bd0 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -101,6 +101,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   private var podsAllocatorUnderTest: ExecutorPodsAllocator = _
 
+  val appId = "testapp"
+
   before {
     MockitoAnnotations.openMocks(this).close()
     when(kubernetesClient.pods()).thenReturn(podOperations)
@@ -209,7 +211,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))()
     assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0)
 
-    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 
(podAllocationSize + 1)))
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> (podAllocationSize + 1)))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     for (execId <- 1 until podAllocationSize) {
       snapshotsStore.updatePod(runningExecutor(execId))
@@ -230,7 +233,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
 
   test("When a current batch reaches error states immediately, re-request" +
     " them on the next batch.") {
-    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 
podAllocationSize))
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> podAllocationSize))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     for (execId <- 1 until podAllocationSize) {
       snapshotsStore.updatePod(runningExecutor(execId))
@@ -242,6 +246,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     
verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 
1))
   }
 
+  test("Verify stopping deletes the labeled pods") {
+    when(podOperations
+      .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+      .thenReturn(podOperations)
+    when(podOperations
+      .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
+      .thenReturn(labeledPods)
+    podsAllocatorUnderTest.stop(TEST_SPARK_APP_ID)
+    verify(labeledPods).delete()
+  }
+
   test("When an executor is requested but the API does not report it in a 
reasonable time, retry" +
     " requesting that executor.") {
     when(podOperations
@@ -253,7 +268,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     when(podOperations
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
       .thenReturn(labeledPods)
-    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> 1))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(podOperations).create(podWithAttachedContainerForId(1))
     waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
@@ -281,7 +297,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     waitForExecutorPodsClock.setTime(startTime)
 
     // Target 1 executor, make sure it's requested, even with an empty initial 
snapshot.
-    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> 1))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(podOperations).create(podWithAttachedContainerForId(1))
 
@@ -293,7 +310,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
     verify(podOperations, never()).delete()
 
     // Request 3 more executors, make sure all are requested.
-    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4))
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> 4))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
     verify(podOperations).create(podWithAttachedContainerForId(2))
@@ -310,7 +328,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with 
BeforeAndAfter {
 
     // Scale down to 1. Pending executors (both acknowledged and not) should 
be deleted.
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
-    podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> 1))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
     verify(podOperations, times(4)).create(any())
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
new file mode 100644
index 0000000..ae1477e
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.client.KubernetesClient
+import org.mockito.{Mock, MockitoAnnotations}
+import org.mockito.Mockito.when
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark._
+import org.apache.spark.deploy.k8s.Config._
+
+class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter {
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var sc: SparkContext = _
+
+  @Mock
+  private var env: SparkEnv = _
+
+  @Mock
+  private var sparkConf: SparkConf = _
+
+  before {
+    MockitoAnnotations.openMocks(this).close()
+    when(sc.conf).thenReturn(sparkConf)
+    when(sc.conf.get(KUBERNETES_DRIVER_POD_NAME)).thenReturn(None)
+    when(sc.env).thenReturn(env)
+  }
+
+  test("constructing a AbstractPodsAllocator works") {
+    val validConfigs = List("statefulset", "direct",
+      "org.apache.spark.scheduler.cluster.k8s.StatefulsetPodsAllocator",
+      "org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator")
+    validConfigs.foreach { c =>
+      val manager = new KubernetesClusterManager()
+        when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c)
+      manager.makeExecutorPodsAllocator(sc, kubernetesClient, null)
+    }
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index bf17aa3..537685d 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -33,7 +33,7 @@ import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
-import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, 
RpcEnv}
 import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, 
TaskSchedulerImpl}
 import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor,
 RemoveExecutor, StopDriver}
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -93,6 +93,9 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
   @Mock
   private var pollEvents: ExecutorPodsPollingSnapshotSource = _
 
+  @Mock
+  private var context: RpcCallContext = _
+
   private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _
   private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _
 
@@ -149,7 +152,7 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     verify(eventQueue).stop()
     verify(watchEvents).stop()
     verify(pollEvents).stop()
-    verify(labeledPods).delete()
+    verify(podAllocator).stop(TEST_SPARK_APP_ID)
     verify(labledConfigMaps).delete()
     verify(kubernetesClient).close()
   }
@@ -250,4 +253,10 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
     endpoint.receiveAndReply(null).apply(
       RegisterExecutor("1", null, "host1", 1, Map.empty, Map.empty, Map.empty, 
0))
   }
+
+  test("Dynamically fetch an executor ID") {
+    val endpoint = schedulerBackendUnderTest.createDriverEndpoint()
+    endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger"))
+    verify(context).reply("1")
+  }
 }
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetAllocatorSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetAllocatorSuite.scala
new file mode 100644
index 0000000..5f8ceb2
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetAllocatorSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.apps.StatefulSet
+import io.fabric8.kubernetes.client.KubernetesClient
+import io.fabric8.kubernetes.client.dsl._
+import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito.{never, times, verify, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, 
KubernetesExecutorSpec}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
+import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT
+import org.apache.spark.resource._
+import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
+
+class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
+
+  private val driverPodName = "driver"
+
+  private val driverPod = new PodBuilder()
+    .withNewMetadata()
+      .withName(driverPodName)
+      .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)
+      .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
+      .withUid("driver-pod-uid")
+      .endMetadata()
+    .build()
+
+  private val conf = new SparkConf()
+    .set(KUBERNETES_DRIVER_POD_NAME, driverPodName)
+    .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s")
+
+
+  private val defaultProfile: ResourceProfile = 
ResourceProfile.getOrCreateDefaultProfile(conf)
+  private val secondProfile: ResourceProfile = 
ResourceProfile.getOrCreateDefaultProfile(conf)
+
+  private val secMgr = new SecurityManager(conf)
+
+
+  @Mock
+  private var kubernetesClient: KubernetesClient = _
+
+  @Mock
+  private var appOperations: AppsAPIGroupDSL = _
+
+  @Mock
+  private var statefulSetOperations: MixedOperation[
+    apps.StatefulSet, apps.StatefulSetList, 
RollableScalableResource[apps.StatefulSet]] = _
+
+  @Mock
+  private var editableSet: RollableScalableResource[apps.StatefulSet] = _
+
+  @Mock
+  private var podOperations: PODS = _
+
+
+  @Mock
+  private var driverPodOperations: PodResource[Pod] = _
+
+  private var podsAllocatorUnderTest: StatefulsetPodsAllocator = _
+
+  private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _
+
+  @Mock
+  private var executorBuilder: KubernetesExecutorBuilder = _
+
+  @Mock
+  private var schedulerBackend: KubernetesClusterSchedulerBackend = _
+
+  val appId = "testapp"
+
+  private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
+    (invocation: InvocationOnMock) => {
+      val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
+      KubernetesExecutorSpec(executorPodWithId(0,
+        k8sConf.resourceProfileId.toInt), Seq.empty)
+  }
+
+  before {
+    MockitoAnnotations.openMocks(this).close()
+    when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(kubernetesClient.apps()).thenReturn(appOperations)
+    when(appOperations.statefulSets()).thenReturn(statefulSetOperations)
+    when(statefulSetOperations.withName(any())).thenReturn(editableSet)
+    when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+    when(driverPodOperations.get).thenReturn(driverPod)
+    when(driverPodOperations.waitUntilReady(any(), 
any())).thenReturn(driverPod)
+    
when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), 
meq(secMgr),
+      meq(kubernetesClient), 
any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer())
+    snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
+    podsAllocatorUnderTest = new StatefulsetPodsAllocator(
+      conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, null)
+    when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty)
+    podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
+  }
+
+  test("Validate initial statefulSet creation & cleanup with two resource 
profiles") {
+    val rprof = new ResourceProfileBuilder()
+    val taskReq = new TaskResourceRequests().resource("gpu", 1)
+    val execReq =
+      new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia")
+    rprof.require(taskReq).require(execReq)
+    val immrprof = new ResourceProfile(rprof.executorResources, 
rprof.taskResources)
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> (10),
+          immrprof -> (420)))
+    val captor = ArgumentCaptor.forClass(classOf[StatefulSet])
+    verify(statefulSetOperations, times(2)).create(any())
+    podsAllocatorUnderTest.stop(appId)
+    verify(editableSet, times(2)).delete()
+  }
+
+  test("Validate statefulSet scale up") {
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> (10)))
+    val captor = ArgumentCaptor.forClass(classOf[StatefulSet])
+    verify(statefulSetOperations, times(1)).create(captor.capture())
+    val set = captor.getValue()
+    val setName = set.getMetadata().getName()
+    val namespace = set.getMetadata().getNamespace()
+    assert(namespace === "default")
+    val spec = set.getSpec()
+    assert(spec.getReplicas() === 10)
+    assert(spec.getPodManagementPolicy() === "Parallel")
+    verify(podOperations, never()).create(any())
+    podsAllocatorUnderTest.setTotalExpectedExecutors(
+      Map(defaultProfile -> (20)))
+    verify(editableSet, times(1)).scale(any(), any())
+  }
+}
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index f722471..c233bd8 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -85,13 +85,14 @@ case "$1" in
       -Xms$SPARK_EXECUTOR_MEMORY
       -Xmx$SPARK_EXECUTOR_MEMORY
       -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"
-      org.apache.spark.executor.CoarseGrainedExecutorBackend
+      org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBackend
       --driver-url $SPARK_DRIVER_URL
       --executor-id $SPARK_EXECUTOR_ID
       --cores $SPARK_EXECUTOR_CORES
       --app-id $SPARK_APPLICATION_ID
       --hostname $SPARK_EXECUTOR_POD_IP
       --resourceProfileId $SPARK_RESOURCE_PROFILE_ID
+      --podName $SPARK_EXECUTOR_POD_NAME
     )
     ;;
 
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
index 1c12123..6db4bee 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala
@@ -16,7 +16,11 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest
 
+import scala.collection.JavaConverters._
+
 import io.fabric8.kubernetes.api.model.Pod
+import org.scalatest.concurrent.Eventually
+import org.scalatest.matchers.should.Matchers._
 
 import org.apache.spark.TestUtils
 import org.apache.spark.launcher.SparkLauncher
@@ -25,11 +29,24 @@ private[spark] trait BasicTestsSuite { k8sSuite: 
KubernetesSuite =>
 
   import BasicTestsSuite._
   import KubernetesSuite.k8sTestTag
+  import KubernetesSuite.{TIMEOUT, INTERVAL}
 
   test("Run SparkPi with no resources", k8sTestTag) {
     runSparkPiAndVerifyCompletion()
   }
 
+  test("Run SparkPi with no resources & statefulset allocation", k8sTestTag) {
+    sparkAppConf.set("spark.kubernetes.allocation.pods.allocator", 
"statefulset")
+    runSparkPiAndVerifyCompletion()
+    // Verify there is no dangling statefulset
+    // This depends on the garbage collection happening inside of K8s so give 
it some time.
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      val sets = 
kubernetesTestComponents.kubernetesClient.apps().statefulSets().list().getItems
+      val scalaSets = sets.asScala
+      scalaSets.size shouldBe (0)
+    }
+  }
+
   test("Run SparkPi with a very long application name.", k8sTestTag) {
     sparkAppConf.set("spark.app.name", "long" * 40)
     runSparkPiAndVerifyCompletion()
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index d65f594..06080ab 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -100,11 +100,17 @@ class KubernetesSuite extends SparkFunSuite
       .withLabel("spark-role", "executor")
       .list()
       .getItems.asScala.foreach { execPod =>
-        logInfo(s"\nBEGIN executor (${execPod.getMetadata.getName}) POD 
log:\n" +
+        val podLog = try {
           kubernetesTestComponents.kubernetesClient
             .pods()
             .withName(execPod.getMetadata.getName)
-            .getLog)
+            .getLog
+        } catch {
+          case e: io.fabric8.kubernetes.client.KubernetesClientException =>
+            "Error fetching log (pod is likely not ready) ${e}"
+        }
+        logInfo(s"\nBEGIN executor (${execPod.getMetadata.getName}) POD 
log:\n" +
+        podLog)
         logInfo(s"END executor (${execPod.getMetadata.getName}) POD log")
       }
   }
@@ -237,6 +243,28 @@ class KubernetesSuite extends SparkFunSuite
       Option((interval, None)))
   }
 
+  protected def runMiniReadWriteAndVerifyCompletion(
+      wordCount: Int,
+      appResource: String = containerLocalSparkDistroExamplesJar,
+      driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+      executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+      appArgs: Array[String] = Array.empty[String],
+      isJVM: Boolean = true,
+      interval: Option[PatienceConfiguration.Interval] = None): Unit = {
+    runSparkApplicationAndVerifyCompletion(
+      appResource,
+      SPARK_MINI_READ_WRITE_TEST,
+      Seq(s"Success! Local Word Count $wordCount and " +
+    s"D Word Count $wordCount agree."),
+      Seq(),
+      appArgs,
+      driverPodChecker,
+      executorPodChecker,
+      isJVM,
+      None,
+      Option((interval, None)))
+  }
+
   protected def runSparkRemoteCheckAndVerifyCompletion(
       appResource: String = containerLocalSparkDistroExamplesJar,
       driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
@@ -570,6 +598,7 @@ private[spark] object KubernetesSuite {
   val MinikubeTag = Tag("minikube")
   val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
   val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
+  val SPARK_MINI_READ_WRITE_TEST = 
"org.apache.spark.examples.MiniReadWriteTest"
   val SPARK_REMOTE_MAIN_CLASS: String = 
"org.apache.spark.examples.SparkRemoteFileTest"
   val SPARK_DRIVER_MAIN_CLASS: String = 
"org.apache.spark.examples.DriverSubmissionTest"
   val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes))
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
index 86f8cdd..0362b34 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
@@ -28,7 +28,7 @@ import 
org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
 private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
   import PVTestsSuite._
 
-  private def setupLocalStorage(): Unit = {
+  private def setupLocalStorageClass(): Unit = {
     val scBuilder = new StorageClassBuilder()
       .withKind("StorageClass")
       .withApiVersion("storage.k8s.io/v1")
@@ -37,6 +37,21 @@ private[spark] trait PVTestsSuite { k8sSuite: 
KubernetesSuite =>
       .endMetadata()
       .withProvisioner("kubernetes.io/no-provisioner")
       .withVolumeBindingMode("WaitForFirstConsumer")
+    try {
+      kubernetesTestComponents
+        .kubernetesClient
+        .storage()
+        .storageClasses()
+        .create(scBuilder.build())
+    } catch {
+      case e: io.fabric8.kubernetes.client.KubernetesClientException =>
+        // Ignore storage class error sometimes we have a dangling class
+    }
+  }
+
+  private def setupLocalStorage(): Unit = {
+
+    setupLocalStorageClass()
 
     val pvBuilder = new PersistentVolumeBuilder()
       .withKind("PersistentVolume")
@@ -77,12 +92,6 @@ private[spark] trait PVTestsSuite { k8sSuite: 
KubernetesSuite =>
 
     kubernetesTestComponents
       .kubernetesClient
-      .storage()
-      .storageClasses()
-      .create(scBuilder.build())
-
-    kubernetesTestComponents
-      .kubernetesClient
       .persistentVolumes()
       .create(pvBuilder.build())
 
@@ -122,6 +131,71 @@ private[spark] trait PVTestsSuite { k8sSuite: 
KubernetesSuite =>
     }
   }
 
+  test("PVs with local hostpath storage on statefulsets", k8sTestTag, 
MinikubeTag) {
+    sparkAppConf
+      
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
+        CONTAINER_MOUNT_PATH)
+      
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName",
+        PVC_NAME)
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path",
+        CONTAINER_MOUNT_PATH)
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
+        PVC_NAME)
+      .set("spark.kubernetes.allocation.pods.allocator", "statefulset")
+    val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
+    try {
+      setupLocalStorage()
+      runMiniReadWriteAndVerifyCompletion(
+        FILE_CONTENTS.split(" ").length,
+        driverPodChecker = (driverPod: Pod) => {
+          doBasicDriverPodCheck(driverPod)
+        },
+        executorPodChecker = (executorPod: Pod) => {
+          doBasicExecutorPodCheck(executorPod)
+        },
+        appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file"),
+        interval = Some(PV_TESTS_INTERVAL)
+      )
+    } finally {
+      // make sure this always runs
+      deleteLocalStorage()
+    }
+  }
+
+  test("PVs with local hostpath and storageClass on statefulsets", k8sTestTag, 
MinikubeTag) {
+    sparkAppConf
+      
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
+        CONTAINER_MOUNT_PATH)
+      
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName",
+        PVC_NAME)
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path",
+        CONTAINER_MOUNT_PATH)
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
+        PVC_NAME + "OnDemand")
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass",
+        "standard")
+      
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit",
 "1G")
+      .set("spark.kubernetes.allocation.pods.allocator", "statefulset")
+    val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
+    try {
+      setupLocalStorage()
+      runMiniReadWriteAndVerifyCompletion(
+        FILE_CONTENTS.split(" ").length,
+        driverPodChecker = (driverPod: Pod) => {
+          doBasicDriverPodCheck(driverPod)
+        },
+        executorPodChecker = (executorPod: Pod) => {
+          doBasicExecutorPodCheck(executorPod)
+        },
+        appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file"),
+        interval = Some(PV_TESTS_INTERVAL)
+      )
+    } finally {
+      // make sure this always runs
+      deleteLocalStorage()
+    }
+  }
+
   test("PVs with local storage", k8sTestTag, MinikubeTag) {
     sparkAppConf
       
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to