[ 
https://issues.apache.org/jira/browse/SPARK-24432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726977#comment-16726977
 ] 

ASF GitHub Bot commented on SPARK-24432:
----------------------------------------

vanzin closed pull request #22722: [SPARK-24432][k8s] Add support for dynamic 
resource allocation on Kubernetes 
URL: https://github.com/apache/spark/pull/22722
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java
new file mode 100644
index 0000000000000..7135d1af5facd
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.kubernetes;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.sasl.SecretKeyHolder;
+import org.apache.spark.network.shuffle.ExternalShuffleClient;
+import org.apache.spark.network.shuffle.protocol.RegisterDriver;
+import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat;
+import org.apache.spark.network.util.TransportConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A client for talking to the external shuffle service in Kubernetes cluster 
mode.
+ *
+ * This is used by the each Spark executor to register with a corresponding 
external
+ * shuffle service on the cluster. The purpose is for cleaning up shuffle files
+ * reliably if the application exits unexpectedly.
+ */
+public class KubernetesExternalShuffleClient extends ExternalShuffleClient {
+  private static final Logger logger = LoggerFactory
+    .getLogger(KubernetesExternalShuffleClient.class);
+
+  private final ScheduledExecutorService heartbeaterThread =
+    Executors.newSingleThreadScheduledExecutor(
+      new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("kubernetes-external-shuffle-client-heartbeater")
+        .build());
+
+  /**
+   * Creates a Kubernetes external shuffle client that wraps the {@link 
ExternalShuffleClient}.
+   * Please refer to docs on {@link ExternalShuffleClient} for more 
information.
+   */
+  public KubernetesExternalShuffleClient(
+      TransportConf conf,
+      SecretKeyHolder secretKeyHolder,
+      boolean saslEnabled,
+      long registrationTimeoutMs) {
+    super(conf, secretKeyHolder, saslEnabled, registrationTimeoutMs);
+  }
+
+  public void registerDriverWithShuffleService(
+      String host,
+      int port,
+      long heartbeatTimeoutMs,
+      long heartbeatIntervalMs) throws IOException, InterruptedException {
+    checkInit();
+    ByteBuffer registerDriver = new RegisterDriver(appId, 
heartbeatTimeoutMs).toByteBuffer();
+    TransportClient client = clientFactory.createClient(host, port);
+    client.sendRpc(registerDriver, new RegisterDriverCallback(client, 
heartbeatIntervalMs));
+  }
+
+  @Override
+  public void close() {
+    heartbeaterThread.shutdownNow();
+    super.close();
+  }
+
+  private class RegisterDriverCallback implements RpcResponseCallback {
+    private final TransportClient client;
+    private final long heartbeatIntervalMs;
+
+    private RegisterDriverCallback(TransportClient client, long 
heartbeatIntervalMs) {
+      this.client = client;
+      this.heartbeatIntervalMs = heartbeatIntervalMs;
+    }
+
+    @Override
+    public void onSuccess(ByteBuffer response) {
+      heartbeaterThread.scheduleAtFixedRate(
+        new Heartbeater(client), 0, heartbeatIntervalMs, 
TimeUnit.MILLISECONDS);
+      logger.info("Successfully registered app " + appId + " with external 
shuffle service.");
+    }
+
+    @Override
+    public void onFailure(Throwable e) {
+      logger.warn("Unable to register app " + appId + " with external shuffle 
service. " +
+        "Please manually remove shuffle data after driver exit. Error: " + e);
+    }
+  }
+
+  private class Heartbeater implements Runnable {
+
+    private final TransportClient client;
+
+    private Heartbeater(TransportClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public void run() {
+      // TODO: Stop sending heartbeats if the shuffle service has lost the app 
due to timeout
+      client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer());
+    }
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
index 60179f126bc44..3510509f20eee 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
+import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -32,7 +32,7 @@
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.shuffle.ExternalShuffleClient;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
+import org.apache.spark.network.shuffle.protocol.RegisterDriver;
 import org.apache.spark.network.util.TransportConf;
 
 /**
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index a68a297519b66..f5e51c5cd4929 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -23,8 +23,6 @@
 import io.netty.buffer.Unpooled;
 
 import org.apache.spark.network.protocol.Encodable;
-import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
-import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
 
 /**
  * Messages handled by the {@link 
org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java
similarity index 91%
rename from 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
rename to 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java
index d5f53ccb7f741..8e9e38ff0c720 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java
@@ -15,19 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.spark.network.shuffle.protocol.mesos;
+package org.apache.spark.network.shuffle.protocol;
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
 
 import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 
 // Needed by ScalaDoc. See SPARK-7726
 import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
 
 /**
- * A message sent from the driver to register with the 
MesosExternalShuffleService.
+ * A message sent from the driver to register with the ExternalShuffleService.
  */
 public class RegisterDriver extends BlockTransferMessage {
   private final String appId;
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java
similarity index 92%
rename from 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java
rename to 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java
index b30bb9aed55b6..3f6cb0d0d4798 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.spark.network.shuffle.protocol.mesos;
+package org.apache.spark.network.shuffle.protocol;
 
 import io.netty.buffer.ByteBuf;
 import org.apache.spark.network.protocol.Encoders;
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 
 // Needed by ScalaDoc. See SPARK-7726
 import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
 
 /**
- * A heartbeat sent from the driver to the MesosExternalShuffleService.
+ * A heartbeat sent from the driver to the MesosExternalShuffleService and
+ * KubernetesExternalShuffleService.
  */
 public class ShuffleServiceHeartbeat extends BlockTransferMessage {
   private final String appId;
diff --git a/conf/k8s-shuffle-service.yaml.template 
b/conf/k8s-shuffle-service.yaml.template
new file mode 100644
index 0000000000000..5fb674b2480c3
--- /dev/null
+++ b/conf/k8s-shuffle-service.yaml.template
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: extensions/v1beta1
+kind: DaemonSet
+metadata:
+  labels:
+  #  key1: value1
+  #  key2: value2
+  namespace: spark   # change namespace according to your cluster 
configuration.
+  name: spark-shuffle-service
+spec:
+  template:
+    metadata:
+      labels:
+        #  key1: value1
+        #  key2: value2
+    spec:
+      # nodeSelector:
+      #  key: "value"     # change this launch the shuffle service on some 
specific nodes
+      volumes:
+        - name: shuffle-volume
+          hostPath:
+            path: '/tmp/spark-local'  # change this path according to your 
cluster configuration.
+      containers:
+        - name: shuffle-service
+          args: ["shuffle-service"]
+          image: kubespark/spark:spark-241   # change the image according to 
your cluster configuration.
+          imagePullPolicy: IfNotPresent
+          volumeMounts:
+            - mountPath: '/tmp/spark-local'   # change this path according to 
your cluster configuration.
+              name: shuffle-volume
+          resources:
+             requests:
+               cpu: "1"
+             limits:
+               cpu: "1"
\ No newline at end of file
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index f6b3c37f0fe72..64eeb54cf2f93 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -100,6 +100,7 @@ class ExternalShuffleService(sparkConf: SparkConf, 
securityManager: SecurityMana
   }
 
   def stop() {
+    blockHandler.close()
     if (server != null) {
       server.close()
       server = null
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 0fe82ac0cedc5..5ee6e8dfd4fd6 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -256,8 +256,14 @@ private[spark] class BlockManager(
     blockManagerId = if (idFromMaster != null) idFromMaster else id
 
     shuffleServerId = if (externalShuffleServiceEnabled) {
-      logInfo(s"external shuffle service port = $externalShuffleServicePort")
-      BlockManagerId(executorId, blockTransferService.hostName, 
externalShuffleServicePort)
+      val shuffleServerHostName = if (blockManagerId.isDriver) {
+        blockTransferService.hostName
+      } else {
+        conf.get("spark.shuffle.service.host", blockTransferService.hostName)
+      }
+      logInfo(s"external shuffle service host = $shuffleServerHostName, " +
+        s"port = $externalShuffleServicePort")
+      BlockManagerId(executorId, shuffleServerHostName, 
externalShuffleServicePort)
     } else {
       blockManagerId
     }
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index b4088d79addff..9cc077621c56c 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -156,6 +156,40 @@ exits.
 
 Use the exact prefix `spark.kubernetes.authenticate` for Kubernetes 
authentication parameters in client mode.
 
+## Dynamic Executor Scaling
+
+To enable dynamic resource allocation, you need to first launch our external 
shuffle service on the nodes that spark executors will
+be created on. This is typically a 
[DaemonSet](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/)
 with a provisioned
+[hostpath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) 
volume. This shuffle service may be shared by executors
+belonging to different SparkJobs. Using Spark with dynamic allocation on 
Kubernetes assumes that a cluster administrator has set up one
+or more shuffle-service DaemonSets in the cluster.
+
+A sample configuration file is provided in 
`conf\k8s-shuffle-service.yaml.template` which can be customized as needed for 
a particular cluster.
+It is important to note that `spec.template.metadata.labels` are setup 
appropriately for the shuffle service because there may be multiple
+shuffle service instances running in a cluster. The labels give Spark 
applications a way to target a particular shuffle service.
+
+Please note that when enabling dynamic resource allocation, the executor's 
shuffle files should be visible to the shuffle service that
+located on the same node, which means the volume mount configuration of 
shuffle service should be identical to spark executors'.
+
+For example, if the shuffle service we want to use is in the 'spark' 
namespace, and has pods with labels app=spark-shuffle-service, we can use
+those tags to target that particular shuffle service at job launch time. In 
order to run a job with dynamic allocation enabled,
+the command may then look like the following:
+
+```bash
+$ bin/spark-submit \
+    --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \
+    --deploy-mode cluster \
+    --name spark-groupby \
+    --class org.apache.spark.examples.GroupByTest \
+    --conf spark.local.dir=/tmp/spark-local \
+    --conf spark.dynamicAllocation.enabled=true \
+    --conf spark.shuffle.service.enabled=true \
+    --conf spark.kubernetes.shuffle.namespace=spark \
+    --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service" \
+    --conf spark.kubernetes.container.image=<spark-image> \
+    local:///path/to/examples.jar 10 400000 2
+```
+
 ## Dependency Management
 
 If your application's dependencies are all hosted in remote locations like 
HDFS or HTTP servers, they may be referred to
@@ -391,6 +425,23 @@ specific to Spark on Kubernetes.
     Custom container image to use for executors.
   </td>
 </tr>
+<tr>
+  <td><code>spark.kubernetes.shuffle.namespace</code></td>
+  <td><code>default</code></td>
+  <td>
+    Namespace in which the shuffle service pods are present.
+    The shuffle service must be created in the cluster prior to attempts to 
use it.
+  </td>
+</tr>
+<tr>
+  <td><code>spark.kubernetes.shuffle.labels</code></td>
+  <td><code>(none)</code></td>
+  <td>
+    Labels that will be used to look up shuffle service pods.
+    This should be a comma-separated list of label key-value pairs, where each 
label is in the format key=value.
+    The labels chosen must be such that they match exactly one shuffle service 
pod on each node that executors are launched.
+  </td>
+</tr>
 <tr>
   <td><code>spark.kubernetes.container.image.pullPolicy</code></td>
   <td><code>IfNotPresent</code></td>
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
index fd056bb90e0c4..e58fd27737c5c 100644
--- 
a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
+++ 
b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java
@@ -85,6 +85,7 @@
         break;
       case "org.apache.spark.deploy.ExternalShuffleService":
       case "org.apache.spark.deploy.mesos.MesosExternalShuffleService":
+      case "org.apache.spark.deploy.k8s.KubernetesExternalShuffleService":
         javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS");
         javaOptsKeys.add("SPARK_SHUFFLE_OPTS");
         extraClassPath = getenv("SPARK_DAEMON_CLASSPATH");
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index 71e4d321a0e3a..da0b864f0f8de 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -29,6 +29,30 @@ private[spark] object Config extends Logging {
       .stringConf
       .createWithDefault("default")
 
+  val KUBERNETES_SHUFFLE_NAMESPACE =
+    ConfigBuilder("spark.kubernetes.shuffle.namespace")
+      .doc("Namespace of the shuffle service")
+      .stringConf
+      .createWithDefault("default")
+
+  val KUBERNETES_SHUFFLE_LABELS =
+    ConfigBuilder("spark.kubernetes.shuffle.labels")
+      .doc("Labels to identify the shuffle service")
+      .stringConf
+      .createOptional
+
+  val SPARK_SHUFFLE_SERVICE_HOST =
+    ConfigBuilder("spark.shuffle.service.host")
+      .doc("Host for Spark Shuffle Service")
+      .internal()
+      .stringConf
+      .createOptional
+
+  val SHUFFLE_CLEANER_INTERVAL_S =
+    ConfigBuilder("spark.shuffle.cleaner.interval")
+      .timeConf(TimeUnit.SECONDS)
+      .createWithDefaultString("30s")
+
   val CONTAINER_IMAGE =
     ConfigBuilder("spark.kubernetes.container.image")
       .doc("Container image to use for Spark containers. Individual container 
types " +
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala
new file mode 100644
index 0000000000000..08cb52dbfd48a
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.nio.ByteBuffer
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.internal.Logging
+import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, 
RegisterDriver, ShuffleServiceHeartbeat}
+import org.apache.spark.network.util.TransportConf
+import org.apache.spark.util.ThreadUtils
+
+// Todo: handle duplicated code with Mesos
+
+/**
+ * An RPC endpoint that receives registration requests from Spark drivers 
running on Kubernetes.
+ * It detects driver termination and calls the cleanup callback to 
[[ExternalShuffleService]].
+ */
+private[spark] class KubernetesShuffleBlockHandler(
+    transportConf: TransportConf,
+    cleanerIntervalS: Long)
+  extends ExternalShuffleBlockHandler(transportConf, null) with Logging {
+
+  ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher")
+    .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, 
TimeUnit.SECONDS)
+
+  // Stores a map of app id to app state (timeout value and last heartbeat)
+  private val connectedApps = new ConcurrentHashMap[String, AppState]()
+
+  protected override def handleMessage(
+      message: BlockTransferMessage,
+      client: TransportClient,
+      callback: RpcResponseCallback): Unit = {
+    message match {
+      case RegisterDriverParam(appId, appState) =>
+        val address = client.getSocketAddress
+        val timeout = appState.heartbeatTimeout
+        logInfo(s"Received registration request from app $appId (remote 
address $address, " +
+          s"heartbeat timeout $timeout ms).")
+        if (connectedApps.contains(appId)) {
+          logWarning(s"Received a registration request from app $appId, but it 
was already " +
+            s"registered")
+        }
+        connectedApps.put(appId, appState)
+        callback.onSuccess(ByteBuffer.allocate(0))
+      case Heartbeat(appId) =>
+        val address = client.getSocketAddress
+        Option(connectedApps.get(appId)) match {
+          case Some(existingAppState) =>
+            logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' 
(remote " +
+              s"address $address).")
+            existingAppState.lastHeartbeat = System.nanoTime()
+          case None =>
+            logWarning(s"Received ShuffleServiceHeartbeat from an unknown app 
(remote " +
+              s"address $address, appId '$appId').")
+        }
+      case _ => super.handleMessage(message, client, callback)
+    }
+  }
+
+  private object Heartbeat {
+    def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId)
+  }
+
+  private class AppState(val heartbeatTimeout: Long, @volatile var 
lastHeartbeat: Long)
+
+  /** An extractor object for matching [[RegisterDriver]] message. */
+  private object RegisterDriverParam {
+    def unapply(r: RegisterDriver): Option[(String, AppState)] =
+      Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, 
System.nanoTime())))
+  }
+
+  private class CleanerThread extends Runnable {
+    override def run(): Unit = {
+      val now = System.nanoTime()
+      connectedApps.asScala.foreach { case (appId, appState) =>
+        if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 
1000) {
+          logInfo(s"Application $appId timed out. Removing shuffle files.")
+          connectedApps.remove(appId)
+          applicationRemoved(appId, true)
+        }
+      }
+    }
+  }
+}
+
+private[spark] class KubernetesExternalShuffleService(conf: SparkConf,
+    securityManager: SecurityManager)
+  extends ExternalShuffleService(conf, securityManager) {
+
+  protected override def newShuffleBlockHandler(
+      tConf: TransportConf): ExternalShuffleBlockHandler = {
+    val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S)
+    new KubernetesShuffleBlockHandler(tConf, cleanerIntervalS)
+  }
+}
+
+private[spark] object KubernetesExternalShuffleService extends Logging {
+  def main(args: Array[String]): Unit = {
+    ExternalShuffleService.main(args,
+      (conf: SparkConf, sm: SecurityManager) => new 
KubernetesExternalShuffleService(conf, sm))
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
index f5fae7cc8c470..dc61ee2d7e3cc 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala
@@ -24,6 +24,38 @@ import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.util.Utils
 
 private[spark] object KubernetesUtils {
+  def parseKeyValuePairs(
+      maybeKeyValues: Option[String],
+      configKey: String,
+      keyValueType: String): Map[String, String] = {
+
+    maybeKeyValues.map(keyValues => {
+      keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => {
+        keyValue.split("=", 2).toSeq match {
+          case Seq(k, v) =>
+            (k, v)
+          case _ =>
+            throw new SparkException(s"Custom $keyValueType set by $configKey 
must be a" +
+              s" comma-separated list of key-value pairs, with format 
<key>=<value>." +
+              s" Got value: $keyValue. All values: $keyValues")
+        }
+      }).toMap
+    }).getOrElse(Map.empty[String, String])
+  }
+
+  def parsePrefixedKeyValuePairs(
+      sparkConf: SparkConf,
+      prefix: String,
+      configType: String): Map[String, String] = {
+    val fromPrefix = sparkConf.getAllWithPrefix(prefix)
+    fromPrefix.groupBy(_._1).foreach {
+      case (key, values) =>
+        require(values.size == 1,
+          s"Cannot have multiple values for a given $configType key, got key 
$key with" +
+            s" values $values")
+    }
+    fromPrefix.toMap
+  }
 
   /**
    * Extract and parse Spark configuration properties with a given name prefix 
and
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
index ff5ad6673b309..fc75f36721c49 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala
@@ -174,17 +174,17 @@ private[spark] class 
DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube
    *         which may be empty if the user-specified credential is empty.
    */
   private def resolveSecretData(
-    userSpecifiedCredential: Option[String],
-    secretName: String): Map[String, String] = {
+      userSpecifiedCredential: Option[String],
+      secretName: String): Map[String, String] = {
     userSpecifiedCredential.map { valueBase64 =>
       Map(secretName -> valueBase64)
     }.getOrElse(Map.empty[String, String])
   }
 
   private def resolveSecretLocation(
-    mountedUserSpecified: Option[String],
-    valueMountedFromSubmitter: Option[String],
-    mountedCanonicalLocation: String): Option[String] = {
+      mountedUserSpecified: Option[String],
+      valueMountedFromSubmitter: Option[String],
+      mountedCanonicalLocation: String): Option[String] = {
     mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ =>
       mountedCanonicalLocation
     })
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala
new file mode 100644
index 0000000000000..3267c7a547806
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.features
+
+import io.fabric8.kubernetes.api.model.{Volume, VolumeBuilder, VolumeMount, 
VolumeMountBuilder}
+import org.apache.commons.io.FilenameUtils
+
+import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf}
+import org.apache.spark.util.Utils
+
+class ExternalShuffleLocalDirsFeatureStep(
+    conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) extends 
LocalDirsFeatureStep(conf) {
+
+  private val resolvedLocalDirs = Utils.getConfiguredLocalDirs(conf.sparkConf)
+
+  override def getDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = {
+    // TODO: Using hostPath for the local directory will also make it such 
that the
+    // other uses of the local directory - broadcasting and caching - will 
also write
+    // to the directory that the shuffle service is aware of. It would be 
better for
+    // these directories to be separate so that the lifetime of the 
non-shuffle scratch
+    // space is tied to an emptyDir instead of the hostPath. This requires a 
change in
+    // core Spark as well.
+    resolvedLocalDirs.zipWithIndex.map {
+      case (shuffleDir, shuffleDirIndex) =>
+        val volumeName = 
s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}"
+        val volume = new VolumeBuilder()
+          .withName(volumeName)
+          .withNewHostPath(shuffleDir)
+          .build()
+        val volumeMount = new VolumeMountBuilder()
+          .withName(volumeName)
+          .withMountPath(shuffleDir)
+          .build()
+        (volume, volumeMount)
+    }
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
index be386e119d465..9d1bbb2ac970a 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala
@@ -16,12 +16,11 @@
  */
 package org.apache.spark.deploy.k8s.features
 
-import java.nio.file.Paths
 import java.util.UUID
 
-import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder, VolumeBuilder, VolumeMountBuilder}
+import io.fabric8.kubernetes.api.model._
 
-import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesRoleSpecificConf, SparkPod}
 import org.apache.spark.deploy.k8s.Config._
 
 private[spark] class LocalDirsFeatureStep(
@@ -40,10 +39,10 @@ private[spark] class LocalDirsFeatureStep(
     .split(",")
   private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS)
 
-  override def configurePod(pod: SparkPod): SparkPod = {
+  def getDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = {
     val localDirVolumes = resolvedLocalDirs
       .zipWithIndex
-      .map { case (localDir, index) =>
+      .map { case (_, index) =>
         new VolumeBuilder()
           .withName(s"spark-local-dir-${index + 1}")
           .withNewEmptyDir()
@@ -59,9 +58,14 @@ private[spark] class LocalDirsFeatureStep(
           .withMountPath(localDirPath)
           .build()
       }
+    localDirVolumes.zip(localDirVolumeMounts)
+  }
+
+  override def configurePod(pod: SparkPod): SparkPod = {
+    val volumesWithMounts = getDirVolumesWithMounts()
     val podWithLocalDirVolumes = new PodBuilder(pod.pod)
       .editSpec()
-        .addToVolumes(localDirVolumes: _*)
+        .addToVolumes(volumesWithMounts.map(_._1): _*)
         .endSpec()
       .build()
     val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container)
@@ -69,7 +73,7 @@ private[spark] class LocalDirsFeatureStep(
         .withName("SPARK_LOCAL_DIRS")
         .withValue(resolvedLocalDirs.mkString(","))
         .endEnv()
-      .addToVolumeMounts(localDirVolumeMounts: _*)
+      .addToVolumeMounts(volumesWithMounts.map(_._2): _*)
       .build()
     SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts)
   }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 9999c62c878df..7e6524cd15534 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -23,10 +23,13 @@ import com.google.common.cache.CacheBuilder
 import io.fabric8.kubernetes.client.Config
 
 import org.apache.spark.SparkContext
-import org.apache.spark.deploy.k8s.{KubernetesUtils, 
SparkKubernetesClientFactory}
+import org.apache.spark.deploy.k8s.{KubernetesConf, 
KubernetesRoleSpecificConf, KubernetesUtils, SparkKubernetesClientFactory}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.internal.Logging
+import 
org.apache.spark.deploy.k8s.features.{ExternalShuffleLocalDirsFeatureStep, 
LocalDirsFeatureStep}
+import org.apache.spark.internal.{config, Logging}
+import org.apache.spark.network.netty.SparkTransportConf
+import 
org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
 import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, 
TaskScheduler, TaskSchedulerImpl}
 import org.apache.spark.util.{SystemClock, ThreadUtils}
 
@@ -79,15 +82,38 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
     val removedExecutorsCache = CacheBuilder.newBuilder()
       .expireAfterWrite(3, TimeUnit.MINUTES)
       .build[java.lang.Long, java.lang.Long]()
+
+    val kubernetesShuffleManager = if (sc.conf.get(
+        org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) {
+      val kubernetesExternalShuffleClient = new 
KubernetesExternalShuffleClient(
+        SparkTransportConf.fromSparkConf(sc.conf, "shuffle"),
+        sc.env.securityManager,
+        sc.env.securityManager.isAuthenticationEnabled(),
+        sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
+      Some(new KubernetesExternalShuffleManagerImpl(
+        sc.conf,
+        kubernetesClient,
+        kubernetesExternalShuffleClient))
+    } else None
+
+    val localDirsFeatureStepFactory: KubernetesConf[_ <: 
KubernetesRoleSpecificConf]
+        => LocalDirsFeatureStep = if (kubernetesShuffleManager.isDefined) {
+      new ExternalShuffleLocalDirsFeatureStep(_)
+    } else {
+      new LocalDirsFeatureStep(_)
+    }
+
+    val kubernetesExecutorBuilder = new KubernetesExecutorBuilder(
+      provideLocalDirsStep = localDirsFeatureStepFactory)
     val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager(
       sc.conf,
-      new KubernetesExecutorBuilder(),
+      kubernetesExecutorBuilder,
       kubernetesClient,
       snapshotsStore,
       removedExecutorsCache)
 
     val executorPodsAllocator = new ExecutorPodsAllocator(
-      sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, 
snapshotsStore, new SystemClock())
+      sc.conf, kubernetesExecutorBuilder, kubernetesClient, snapshotsStore, 
new SystemClock())
 
     val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource(
       snapshotsStore,
@@ -107,7 +133,8 @@ private[spark] class KubernetesClusterManager extends 
ExternalClusterManager wit
       executorPodsAllocator,
       executorPodsLifecycleEventHandler,
       podsWatchEventSource,
-      podsPollingEventSource)
+      podsPollingEventSource,
+      kubernetesShuffleManager)
   }
 
   override def initialize(scheduler: TaskScheduler, backend: 
SchedulerBackend): Unit = {
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index fa6dc2c479bbf..f134bee60705b 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -19,12 +19,15 @@ package org.apache.spark.scheduler.cluster.k8s
 import java.util.concurrent.ExecutorService
 
 import io.fabric8.kubernetes.client.KubernetesClient
+import scala.collection.JavaConverters._
 import scala.concurrent.{ExecutionContext, Future}
 
+import org.apache.spark.SparkEnv
 import org.apache.spark.deploy.k8s.Constants._
-import org.apache.spark.rpc.{RpcAddress, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv}
 import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, 
SchedulerBackendUtils}
+import 
org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig,
 SparkAppConfig}
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 private[spark] class KubernetesClusterSchedulerBackend(
@@ -36,7 +39,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
     podAllocator: ExecutorPodsAllocator,
     lifecycleEventHandler: ExecutorPodsLifecycleManager,
     watchEvents: ExecutorPodsWatchSnapshotSource,
-    pollEvents: ExecutorPodsPollingSnapshotSource)
+    pollEvents: ExecutorPodsPollingSnapshotSource,
+    kubernetesShuffleManager: Option[KubernetesExternalShuffleManager])
   extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) {
 
   private implicit val requestExecutorContext = 
ExecutionContext.fromExecutorService(
@@ -61,6 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
     if (!Utils.isDynamicAllocationEnabled(conf)) {
       podAllocator.setTotalExpectedExecutors(initialExecutors)
     }
+    kubernetesShuffleManager.foreach(_.start(applicationId()))
     lifecycleEventHandler.start(this)
     podAllocator.start(applicationId())
     watchEvents.start(applicationId())
@@ -93,6 +98,10 @@ private[spark] class KubernetesClusterSchedulerBackend(
       ThreadUtils.shutdown(requestExecutorsService)
     }
 
+    Utils.tryLogNonFatalError {
+      kubernetesShuffleManager.foreach(_.stop())
+    }
+
     Utils.tryLogNonFatalError {
       kubernetesClient.close()
     }
@@ -136,6 +145,46 @@ private[spark] class KubernetesClusterSchedulerBackend(
       // to be deleted eventually.
       addressToExecutorId.get(rpcAddress).foreach(disableExecutor)
     }
+
+    override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
+      new PartialFunction[Any, Unit]() {
+        override def isDefinedAt(msg: Any): Boolean = {
+          msg match {
+            case RetrieveSparkAppConfig =>
+              kubernetesShuffleManager.isDefined
+            case _ => false
+          }
+        }
+
+        override def apply(msg: Any): Unit = {
+          msg match {
+            case RetrieveSparkAppConfig if kubernetesShuffleManager.isDefined 
=>
+              val senderAddress = context.senderAddress
+              val allExecutorPods = kubernetesClient
+                .pods()
+                .withLabel(SPARK_APP_ID_LABEL, applicationId())
+                .list()
+                .getItems
+                .asScala
+              val executor = allExecutorPods.find(pod =>
+                pod.getStatus.getPodIP.equals(senderAddress.host) ||
+                  pod.getSpec.getHostname.equals(senderAddress.host))
+              if (executor.isDefined) {
+                val shuffleSpecificProperties = kubernetesShuffleManager.get
+                  .getShuffleServiceConfigurationForExecutor(executor.get)
+                val reply = SparkAppConfig(
+                  sparkProperties ++ shuffleSpecificProperties,
+                  SparkEnv.get.securityManager.getIOEncryptionKey(),
+                  fetchHadoopDelegationTokens())
+                context.reply(reply)
+              } else {
+                logError(s"Got RetrieveSparkAppConfig message from unknown 
executor" +
+                  s" address $senderAddress")
+              }
+          }
+        }
+      }.orElse(super.receiveAndReply(context))
+    }
   }
 
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
index 364b6fb367722..d48a66d373400 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala
@@ -21,7 +21,7 @@ import org.apache.spark.deploy.k8s.features._
 import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, 
EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep}
 
 private[spark] class KubernetesExecutorBuilder(
-    provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf])
+    provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf])
       => BasicExecutorFeatureStep =
       new BasicExecutorFeatureStep(_),
     provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf])
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala
new file mode 100644
index 0000000000000..13745267a7832
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.scheduler.cluster.k8s
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.{KubernetesClient, 
KubernetesClientException, Watch, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.internal.readiness.Readiness
+import scala.collection.JavaConverters._
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.KubernetesUtils
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient
+
+private[spark] trait KubernetesExternalShuffleManager {
+  def start(appId: String): Unit
+
+  def stop(): Unit
+
+  /**
+   * Returns the properties that should be applied for this executor pod, 
given that
+   * this executor will need to communicate with an external shuffle service.
+   *
+   * In practice, this seq will always have a size of 1, but since this 
method's semantics are that
+   * the returned values are key-value pairs to apply as properties, it is 
clearer to express
+   * this as a collection.
+   */
+  def getShuffleServiceConfigurationForExecutor(executorPod: Pod): 
Seq[(String, String)]
+}
+
+private[spark] class KubernetesExternalShuffleManagerImpl(
+    sparkConf: SparkConf,
+    client: KubernetesClient,
+    shuffleClient: KubernetesExternalShuffleClient)
+  extends KubernetesExternalShuffleManager with Logging {
+
+  private val shuffleNamespace = sparkConf.get(KUBERNETES_SHUFFLE_NAMESPACE)
+  private val shufflePodLabels = KubernetesUtils.parseKeyValuePairs(
+    sparkConf.get(KUBERNETES_SHUFFLE_LABELS),
+    KUBERNETES_SHUFFLE_LABELS.key,
+    "shuffle-labels")
+  if (shufflePodLabels.isEmpty) {
+    throw new SparkException(s"Dynamic allocation enabled " +
+      s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified")
+  }
+  private val externalShufflePort = 
sparkConf.getInt("spark.shuffle.service.port", 7337)
+  private val shufflePodCache = scala.collection.mutable.Map[String, String]()
+  private var watcher: Watch = _
+
+  override def start(appId: String): Unit = {
+    // seed the initial cache.
+    val pods = client.pods()
+      .inNamespace(shuffleNamespace)
+      .withLabels(shufflePodLabels.asJava)
+      .list()
+    pods.getItems.asScala.foreach {
+      pod =>
+        if (Readiness.isReady(pod)) {
+          addShufflePodToCache(pod)
+        } else {
+          logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " +
+            s"on node ${pod.getSpec.getNodeName}")
+        }
+    }
+
+    watcher = client
+      .pods()
+      .inNamespace(shuffleNamespace)
+      .withLabels(shufflePodLabels.asJava)
+      .watch(new Watcher[Pod] {
+        override def eventReceived(action: Watcher.Action, p: Pod): Unit = {
+          action match {
+            case Action.DELETED | Action.ERROR =>
+              shufflePodCache.remove(p.getSpec.getNodeName)
+            case Action.ADDED | Action.MODIFIED if Readiness.isReady(p) =>
+              addShufflePodToCache(p)
+          }
+        }
+        override def onClose(e: KubernetesClientException): Unit = {}
+      })
+    shuffleClient.init(appId)
+  }
+
+  private def addShufflePodToCache(pod: Pod): Unit = 
shufflePodCache.synchronized {
+    if (shufflePodCache.contains(pod.getSpec.getNodeName)) {
+      val registeredPodName = shufflePodCache(pod.getSpec.getNodeName)
+      if (registeredPodName.equals(pod.getStatus.getPodIP)) {
+        logWarning(s"The same pod $registeredPodName is added again on 
${pod.getSpec.getNodeName}")
+      } else {
+        throw new SparkException(s"Ambiguous specification of shuffle service 
pod. " +
+          s"Found multiple matching pods: ${pod.getMetadata.getName}, " +
+          s"$registeredPodName on ${pod.getSpec.getNodeName}")
+      }
+    } else {
+      shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP
+    }
+  }
+
+  override def stop(): Unit = {
+    watcher.close()
+    shuffleClient.close()
+  }
+
+  override def getShuffleServiceConfigurationForExecutor(executorPod: Pod)
+      : Seq[(String, String)] = {
+    val nodeName = executorPod.getSpec.getNodeName
+    val shufflePodIp = shufflePodCache.synchronized {
+      shufflePodCache.getOrElse(nodeName,
+        throw new SparkException(s"Unable to find shuffle pod on node 
$nodeName"))
+    }
+    // Inform the shuffle pod about this application so it can watch.
+    shuffleClient.registerDriverWithShuffleService(
+      shufflePodIp,
+      externalShufflePort,
+      sparkConf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs",
+      s"${sparkConf.getTimeAsSeconds("spark.network.timeout", "120s")}s"),
+      sparkConf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
+    )
+    Seq((SPARK_SHUFFLE_SERVICE_HOST.key, shufflePodIp))
+  }
+}
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index 52e7a12dbaf06..9e935944ae106 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -95,7 +95,8 @@ class KubernetesClusterSchedulerBackendSuite extends 
SparkFunSuite with BeforeAn
       podAllocator,
       lifecycleEventHandler,
       watchEvents,
-      pollEvents) {
+      pollEvents,
+      None) {
       override def applicationId(): String = TEST_SPARK_APP_ID
     }
   }
diff --git 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
index 216e8fe31becb..12b72848c3637 100755
--- 
a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
+++ 
b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh
@@ -38,7 +38,7 @@ fi
 
 SPARK_K8S_CMD="$1"
 case "$SPARK_K8S_CMD" in
-    driver | driver-py | driver-r | executor)
+    driver | driver-py | driver-r | executor | shuffle-service)
       shift 1
       ;;
     "")
@@ -123,7 +123,13 @@ case "$SPARK_K8S_CMD" in
       --hostname $SPARK_EXECUTOR_POD_IP
     )
     ;;
-
+  shuffle-service)
+    CMD=(
+      ${JAVA_HOME}/bin/java
+      -cp "$SPARK_CLASSPATH"
+      org.apache.spark.deploy.k8s.KubernetesExternalShuffleService
+    )
+    ;;
   *)
     echo "Unknown command: $SPARK_K8S_CMD" 1>&2
     exit 1
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala
new file mode 100644
index 0000000000000..c44b7c9c01e7d
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.util
+
+import io.fabric8.kubernetes.api.model.{VolumeBuilder, VolumeMountBuilder}
+import io.fabric8.kubernetes.api.model.extensions.DaemonSetBuilder
+import scala.collection.JavaConverters._
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag
+
+private[spark] trait DynamicAllocationTestSuite { k8sSuite: KubernetesSuite =>
+  import DynamicAllocationTestSuite._
+
+  private def startShuffleService(): Unit = {
+    val volume = new VolumeBuilder()
+      .withName(VOLUME_NAME)
+      .withNewHostPath(EXECUTOR_LOCAL_DIR)
+      .build()
+    val volumeMount = new VolumeMountBuilder()
+      .withName(VOLUME_NAME)
+      .withMountPath(EXECUTOR_LOCAL_DIR)
+      .build()
+    val daemonSet = new DaemonSetBuilder()
+      .withNewMetadata()
+        .withName(SHUFFLE_SERVICE_NAME)
+        .withLabels(SHUFFLE_SERVICE_LABEL)
+        .endMetadata()
+      .withNewSpec()
+        .withNewTemplate()
+          .withNewMetadata()
+            .withLabels(SHUFFLE_SERVICE_LABEL)
+            .endMetadata()
+          .withNewSpec()
+            .addNewContainer()
+              .withName("k8s-shuffle-service")
+              .withImage(image)
+              .withArgs("shuffle-service")
+              .withVolumeMounts(volumeMount)
+              .endContainer()
+            .withVolumes(volume)
+            .endSpec()
+          .endTemplate()
+        .endSpec()
+      .build()
+    kubernetesTestComponents.kubernetesClient
+      .extensions()
+      .daemonSets()
+      .inNamespace(kubernetesTestComponents.namespace)
+      .create(daemonSet)
+  }
+
+  private def stopShuffleService(): Unit = {
+    kubernetesTestComponents.kubernetesClient
+      .extensions()
+      .daemonSets()
+      .inNamespace(kubernetesTestComponents.namespace)
+      .withName(SHUFFLE_SERVICE_NAME)
+      .delete()
+  }
+
+  test("Run in cluster mode with dynamic allocation.", k8sTestTag) {
+    startShuffleService()
+    try {
+      runAndVerifyCompletion()
+    } finally {
+      stopShuffleService()
+    }
+  }
+
+  private def runAndVerifyCompletion(): Unit = {
+    sparkAppConf
+      .set("spark.local.dir", EXECUTOR_LOCAL_DIR)
+      .set("spark.kubernetes.shuffle.namespace", 
kubernetesTestComponents.namespace)
+      .set("spark.dynamicAllocation.enabled", "true")
+      .set("spark.shuffle.service.enabled", "true")
+      .set("spark.dynamicAllocation.maxExecutors", "4")
+      .set("spark.kubernetes.shuffle.labels", "app=k8s-shuffle-service")
+    runSparkApplicationAndVerifyCompletion(
+      containerLocalSparkDistroExamplesJar,
+      SPARK_GROUPBY_MAIN_CLASS,
+      Seq("RESULT: "),
+      Array("10", "400000", "2"),
+      doBasicDriverPodCheck,
+      doBasicExecutorPodCheck,
+      appLocator,
+      isJVM = true
+    )
+  }
+}
+
+private[spark] object DynamicAllocationTestSuite {
+  val SPARK_GROUPBY_MAIN_CLASS = 
"org.apache.spark.examples.SimpleSkewedGroupByTest"
+  val SHUFFLE_SERVICE_NAME = "k8s-external-shuffle-service"
+  val VOLUME_NAME = "shuffle-dir"
+  val EXECUTOR_LOCAL_DIR = "/tmp/spark-local"
+  val SHUFFLE_SERVICE_LABEL: util.Map[String, String] = Map("app" -> 
"k8s-shuffle-service").asJava
+}
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index c99a907f98d0a..03164ebe604ff 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -38,12 +38,12 @@ import org.apache.spark.internal.Logging
 
 private[spark] class KubernetesSuite extends SparkFunSuite
   with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with 
SecretsTestsSuite
-  with PythonTestsSuite with ClientModeTestsSuite
+  with PythonTestsSuite with ClientModeTestsSuite with 
DynamicAllocationTestSuite
   with Logging with Eventually with Matchers {
 
   import KubernetesSuite._
 
-  private var sparkHomeDir: Path = _
+  protected var sparkHomeDir: Path = _
   private var pyImage: String = _
   private var rImage: String = _
 
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 5615d6173eebd..fe8290257c17b 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -120,7 +120,7 @@ private[spark] object SparkAppLauncher extends Logging {
         appConf.toStringArray :+ appArguments.mainAppResource
 
     if (appArguments.appArgs.nonEmpty) {
-      commandLine += appArguments.appArgs.mkString(" ")
+      commandLine ++= appArguments.appArgs
     }
     logInfo(s"Launching a spark app with command line: 
${commandLine.mkString(" ")}")
     ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
index 6494cbc18f33e..af85aae216784 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -16,7 +16,6 @@
  */
 package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
 
-import java.io.File
 import java.nio.file.Paths
 
 import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
index 859aa836a3157..6d94b9efd1d29 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -28,8 +28,7 @@ import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
-import org.apache.spark.network.shuffle.protocol.BlockTransferMessage
-import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, 
ShuffleServiceHeartbeat}
+import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, 
RegisterDriver, ShuffleServiceHeartbeat}
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.util.ThreadUtils
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for dynamic resource allocation
> -------------------------------------------
>
>                 Key: SPARK-24432
>                 URL: https://issues.apache.org/jira/browse/SPARK-24432
>             Project: Spark
>          Issue Type: New Feature
>          Components: Kubernetes
>    Affects Versions: 2.4.0, 3.0.0
>            Reporter: Yinan Li
>            Priority: Major
>
> This is an umbrella ticket for work on adding support for dynamic resource 
> allocation into the Kubernetes mode. This requires a Kubernetes-specific 
> external shuffle service. The feature is available in our fork at 
> github.com/apache-spark-on-k8s/spark.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to