[ https://issues.apache.org/jira/browse/SPARK-24432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16726977#comment-16726977 ]
ASF GitHub Bot commented on SPARK-24432: ---------------------------------------- vanzin closed pull request #22722: [SPARK-24432][k8s] Add support for dynamic resource allocation on Kubernetes URL: https://github.com/apache/spark/pull/22722 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java new file mode 100644 index 0000000000000..7135d1af5facd --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/kubernetes/KubernetesExternalShuffleClient.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle.kubernetes; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.spark.network.client.RpcResponseCallback; +import org.apache.spark.network.client.TransportClient; +import org.apache.spark.network.sasl.SecretKeyHolder; +import org.apache.spark.network.shuffle.ExternalShuffleClient; +import org.apache.spark.network.shuffle.protocol.RegisterDriver; +import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat; +import org.apache.spark.network.util.TransportConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A client for talking to the external shuffle service in Kubernetes cluster mode. + * + * This is used by the each Spark executor to register with a corresponding external + * shuffle service on the cluster. The purpose is for cleaning up shuffle files + * reliably if the application exits unexpectedly. + */ +public class KubernetesExternalShuffleClient extends ExternalShuffleClient { + private static final Logger logger = LoggerFactory + .getLogger(KubernetesExternalShuffleClient.class); + + private final ScheduledExecutorService heartbeaterThread = + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("kubernetes-external-shuffle-client-heartbeater") + .build()); + + /** + * Creates a Kubernetes external shuffle client that wraps the {@link ExternalShuffleClient}. + * Please refer to docs on {@link ExternalShuffleClient} for more information. + */ + public KubernetesExternalShuffleClient( + TransportConf conf, + SecretKeyHolder secretKeyHolder, + boolean saslEnabled, + long registrationTimeoutMs) { + super(conf, secretKeyHolder, saslEnabled, registrationTimeoutMs); + } + + public void registerDriverWithShuffleService( + String host, + int port, + long heartbeatTimeoutMs, + long heartbeatIntervalMs) throws IOException, InterruptedException { + checkInit(); + ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer(); + TransportClient client = clientFactory.createClient(host, port); + client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs)); + } + + @Override + public void close() { + heartbeaterThread.shutdownNow(); + super.close(); + } + + private class RegisterDriverCallback implements RpcResponseCallback { + private final TransportClient client; + private final long heartbeatIntervalMs; + + private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) { + this.client = client; + this.heartbeatIntervalMs = heartbeatIntervalMs; + } + + @Override + public void onSuccess(ByteBuffer response) { + heartbeaterThread.scheduleAtFixedRate( + new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS); + logger.info("Successfully registered app " + appId + " with external shuffle service."); + } + + @Override + public void onFailure(Throwable e) { + logger.warn("Unable to register app " + appId + " with external shuffle service. " + + "Please manually remove shuffle data after driver exit. Error: " + e); + } + } + + private class Heartbeater implements Runnable { + + private final TransportClient client; + + private Heartbeater(TransportClient client) { + this.client = client; + } + + @Override + public void run() { + // TODO: Stop sending heartbeats if the shuffle service has lost the app due to timeout + client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer()); + } + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java index 60179f126bc44..3510509f20eee 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/mesos/MesosExternalShuffleClient.java @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat; +import org.apache.spark.network.shuffle.protocol.ShuffleServiceHeartbeat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,7 +32,7 @@ import org.apache.spark.network.client.TransportClient; import org.apache.spark.network.sasl.SecretKeyHolder; import org.apache.spark.network.shuffle.ExternalShuffleClient; -import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; +import org.apache.spark.network.shuffle.protocol.RegisterDriver; import org.apache.spark.network.util.TransportConf; /** diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java index a68a297519b66..f5e51c5cd4929 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java @@ -23,8 +23,6 @@ import io.netty.buffer.Unpooled; import org.apache.spark.network.protocol.Encodable; -import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver; -import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat; /** * Messages handled by the {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler}, or diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java similarity index 91% rename from common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java index d5f53ccb7f741..8e9e38ff0c720 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/RegisterDriver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/RegisterDriver.java @@ -15,19 +15,18 @@ * limitations under the License. */ -package org.apache.spark.network.shuffle.protocol.mesos; +package org.apache.spark.network.shuffle.protocol; import com.google.common.base.Objects; import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; // Needed by ScalaDoc. See SPARK-7726 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** - * A message sent from the driver to register with the MesosExternalShuffleService. + * A message sent from the driver to register with the ExternalShuffleService. */ public class RegisterDriver extends BlockTransferMessage { private final String appId; diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java similarity index 92% rename from common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java rename to common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java index b30bb9aed55b6..3f6cb0d0d4798 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/mesos/ShuffleServiceHeartbeat.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ShuffleServiceHeartbeat.java @@ -15,17 +15,17 @@ * limitations under the License. */ -package org.apache.spark.network.shuffle.protocol.mesos; +package org.apache.spark.network.shuffle.protocol; import io.netty.buffer.ByteBuf; import org.apache.spark.network.protocol.Encoders; -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage; // Needed by ScalaDoc. See SPARK-7726 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type; /** - * A heartbeat sent from the driver to the MesosExternalShuffleService. + * A heartbeat sent from the driver to the MesosExternalShuffleService and + * KubernetesExternalShuffleService. */ public class ShuffleServiceHeartbeat extends BlockTransferMessage { private final String appId; diff --git a/conf/k8s-shuffle-service.yaml.template b/conf/k8s-shuffle-service.yaml.template new file mode 100644 index 0000000000000..5fb674b2480c3 --- /dev/null +++ b/conf/k8s-shuffle-service.yaml.template @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: extensions/v1beta1 +kind: DaemonSet +metadata: + labels: + # key1: value1 + # key2: value2 + namespace: spark # change namespace according to your cluster configuration. + name: spark-shuffle-service +spec: + template: + metadata: + labels: + # key1: value1 + # key2: value2 + spec: + # nodeSelector: + # key: "value" # change this launch the shuffle service on some specific nodes + volumes: + - name: shuffle-volume + hostPath: + path: '/tmp/spark-local' # change this path according to your cluster configuration. + containers: + - name: shuffle-service + args: ["shuffle-service"] + image: kubespark/spark:spark-241 # change the image according to your cluster configuration. + imagePullPolicy: IfNotPresent + volumeMounts: + - mountPath: '/tmp/spark-local' # change this path according to your cluster configuration. + name: shuffle-volume + resources: + requests: + cpu: "1" + limits: + cpu: "1" \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index f6b3c37f0fe72..64eeb54cf2f93 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -100,6 +100,7 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana } def stop() { + blockHandler.close() if (server != null) { server.close() server = null diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 0fe82ac0cedc5..5ee6e8dfd4fd6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -256,8 +256,14 @@ private[spark] class BlockManager( blockManagerId = if (idFromMaster != null) idFromMaster else id shuffleServerId = if (externalShuffleServiceEnabled) { - logInfo(s"external shuffle service port = $externalShuffleServicePort") - BlockManagerId(executorId, blockTransferService.hostName, externalShuffleServicePort) + val shuffleServerHostName = if (blockManagerId.isDriver) { + blockTransferService.hostName + } else { + conf.get("spark.shuffle.service.host", blockTransferService.hostName) + } + logInfo(s"external shuffle service host = $shuffleServerHostName, " + + s"port = $externalShuffleServicePort") + BlockManagerId(executorId, shuffleServerHostName, externalShuffleServicePort) } else { blockManagerId } diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index b4088d79addff..9cc077621c56c 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -156,6 +156,40 @@ exits. Use the exact prefix `spark.kubernetes.authenticate` for Kubernetes authentication parameters in client mode. +## Dynamic Executor Scaling + +To enable dynamic resource allocation, you need to first launch our external shuffle service on the nodes that spark executors will +be created on. This is typically a [DaemonSet](https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/) with a provisioned +[hostpath](https://kubernetes.io/docs/concepts/storage/volumes/#hostpath) volume. This shuffle service may be shared by executors +belonging to different SparkJobs. Using Spark with dynamic allocation on Kubernetes assumes that a cluster administrator has set up one +or more shuffle-service DaemonSets in the cluster. + +A sample configuration file is provided in `conf\k8s-shuffle-service.yaml.template` which can be customized as needed for a particular cluster. +It is important to note that `spec.template.metadata.labels` are setup appropriately for the shuffle service because there may be multiple +shuffle service instances running in a cluster. The labels give Spark applications a way to target a particular shuffle service. + +Please note that when enabling dynamic resource allocation, the executor's shuffle files should be visible to the shuffle service that +located on the same node, which means the volume mount configuration of shuffle service should be identical to spark executors'. + +For example, if the shuffle service we want to use is in the 'spark' namespace, and has pods with labels app=spark-shuffle-service, we can use +those tags to target that particular shuffle service at job launch time. In order to run a job with dynamic allocation enabled, +the command may then look like the following: + +```bash +$ bin/spark-submit \ + --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> \ + --deploy-mode cluster \ + --name spark-groupby \ + --class org.apache.spark.examples.GroupByTest \ + --conf spark.local.dir=/tmp/spark-local \ + --conf spark.dynamicAllocation.enabled=true \ + --conf spark.shuffle.service.enabled=true \ + --conf spark.kubernetes.shuffle.namespace=spark \ + --conf spark.kubernetes.shuffle.labels="app=spark-shuffle-service" \ + --conf spark.kubernetes.container.image=<spark-image> \ + local:///path/to/examples.jar 10 400000 2 +``` + ## Dependency Management If your application's dependencies are all hosted in remote locations like HDFS or HTTP servers, they may be referred to @@ -391,6 +425,23 @@ specific to Spark on Kubernetes. Custom container image to use for executors. </td> </tr> +<tr> + <td><code>spark.kubernetes.shuffle.namespace</code></td> + <td><code>default</code></td> + <td> + Namespace in which the shuffle service pods are present. + The shuffle service must be created in the cluster prior to attempts to use it. + </td> +</tr> +<tr> + <td><code>spark.kubernetes.shuffle.labels</code></td> + <td><code>(none)</code></td> + <td> + Labels that will be used to look up shuffle service pods. + This should be a comma-separated list of label key-value pairs, where each label is in the format key=value. + The labels chosen must be such that they match exactly one shuffle service pod on each node that executors are launched. + </td> +</tr> <tr> <td><code>spark.kubernetes.container.image.pullPolicy</code></td> <td><code>IfNotPresent</code></td> diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index fd056bb90e0c4..e58fd27737c5c 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -85,6 +85,7 @@ break; case "org.apache.spark.deploy.ExternalShuffleService": case "org.apache.spark.deploy.mesos.MesosExternalShuffleService": + case "org.apache.spark.deploy.k8s.KubernetesExternalShuffleService": javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 71e4d321a0e3a..da0b864f0f8de 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -29,6 +29,30 @@ private[spark] object Config extends Logging { .stringConf .createWithDefault("default") + val KUBERNETES_SHUFFLE_NAMESPACE = + ConfigBuilder("spark.kubernetes.shuffle.namespace") + .doc("Namespace of the shuffle service") + .stringConf + .createWithDefault("default") + + val KUBERNETES_SHUFFLE_LABELS = + ConfigBuilder("spark.kubernetes.shuffle.labels") + .doc("Labels to identify the shuffle service") + .stringConf + .createOptional + + val SPARK_SHUFFLE_SERVICE_HOST = + ConfigBuilder("spark.shuffle.service.host") + .doc("Host for Spark Shuffle Service") + .internal() + .stringConf + .createOptional + + val SHUFFLE_CLEANER_INTERVAL_S = + ConfigBuilder("spark.shuffle.cleaner.interval") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("30s") + val CONTAINER_IMAGE = ConfigBuilder("spark.kubernetes.container.image") .doc("Container image to use for Spark containers. Individual container types " + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala new file mode 100644 index 0000000000000..08cb52dbfd48a --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesExternalShuffleService.scala @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.nio.ByteBuffer +import java.util.concurrent.{ConcurrentHashMap, TimeUnit} + +import scala.collection.JavaConverters._ + +import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.ExternalShuffleService +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.internal.Logging +import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} +import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler +import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver, ShuffleServiceHeartbeat} +import org.apache.spark.network.util.TransportConf +import org.apache.spark.util.ThreadUtils + +// Todo: handle duplicated code with Mesos + +/** + * An RPC endpoint that receives registration requests from Spark drivers running on Kubernetes. + * It detects driver termination and calls the cleanup callback to [[ExternalShuffleService]]. + */ +private[spark] class KubernetesShuffleBlockHandler( + transportConf: TransportConf, + cleanerIntervalS: Long) + extends ExternalShuffleBlockHandler(transportConf, null) with Logging { + + ThreadUtils.newDaemonSingleThreadScheduledExecutor("shuffle-cleaner-watcher") + .scheduleAtFixedRate(new CleanerThread(), 0, cleanerIntervalS, TimeUnit.SECONDS) + + // Stores a map of app id to app state (timeout value and last heartbeat) + private val connectedApps = new ConcurrentHashMap[String, AppState]() + + protected override def handleMessage( + message: BlockTransferMessage, + client: TransportClient, + callback: RpcResponseCallback): Unit = { + message match { + case RegisterDriverParam(appId, appState) => + val address = client.getSocketAddress + val timeout = appState.heartbeatTimeout + logInfo(s"Received registration request from app $appId (remote address $address, " + + s"heartbeat timeout $timeout ms).") + if (connectedApps.contains(appId)) { + logWarning(s"Received a registration request from app $appId, but it was already " + + s"registered") + } + connectedApps.put(appId, appState) + callback.onSuccess(ByteBuffer.allocate(0)) + case Heartbeat(appId) => + val address = client.getSocketAddress + Option(connectedApps.get(appId)) match { + case Some(existingAppState) => + logTrace(s"Received ShuffleServiceHeartbeat from app '$appId' (remote " + + s"address $address).") + existingAppState.lastHeartbeat = System.nanoTime() + case None => + logWarning(s"Received ShuffleServiceHeartbeat from an unknown app (remote " + + s"address $address, appId '$appId').") + } + case _ => super.handleMessage(message, client, callback) + } + } + + private object Heartbeat { + def unapply(h: ShuffleServiceHeartbeat): Option[String] = Some(h.getAppId) + } + + private class AppState(val heartbeatTimeout: Long, @volatile var lastHeartbeat: Long) + + /** An extractor object for matching [[RegisterDriver]] message. */ + private object RegisterDriverParam { + def unapply(r: RegisterDriver): Option[(String, AppState)] = + Some((r.getAppId, new AppState(r.getHeartbeatTimeoutMs, System.nanoTime()))) + } + + private class CleanerThread extends Runnable { + override def run(): Unit = { + val now = System.nanoTime() + connectedApps.asScala.foreach { case (appId, appState) => + if (now - appState.lastHeartbeat > appState.heartbeatTimeout * 1000 * 1000) { + logInfo(s"Application $appId timed out. Removing shuffle files.") + connectedApps.remove(appId) + applicationRemoved(appId, true) + } + } + } + } +} + +private[spark] class KubernetesExternalShuffleService(conf: SparkConf, + securityManager: SecurityManager) + extends ExternalShuffleService(conf, securityManager) { + + protected override def newShuffleBlockHandler( + tConf: TransportConf): ExternalShuffleBlockHandler = { + val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S) + new KubernetesShuffleBlockHandler(tConf, cleanerIntervalS) + } +} + +private[spark] object KubernetesExternalShuffleService extends Logging { + def main(args: Array[String]): Unit = { + ExternalShuffleService.main(args, + (conf: SparkConf, sm: SecurityManager) => new KubernetesExternalShuffleService(conf, sm)) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index f5fae7cc8c470..dc61ee2d7e3cc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -24,6 +24,38 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.util.Utils private[spark] object KubernetesUtils { + def parseKeyValuePairs( + maybeKeyValues: Option[String], + configKey: String, + keyValueType: String): Map[String, String] = { + + maybeKeyValues.map(keyValues => { + keyValues.split(",").map(_.trim).filterNot(_.isEmpty).map(keyValue => { + keyValue.split("=", 2).toSeq match { + case Seq(k, v) => + (k, v) + case _ => + throw new SparkException(s"Custom $keyValueType set by $configKey must be a" + + s" comma-separated list of key-value pairs, with format <key>=<value>." + + s" Got value: $keyValue. All values: $keyValues") + } + }).toMap + }).getOrElse(Map.empty[String, String]) + } + + def parsePrefixedKeyValuePairs( + sparkConf: SparkConf, + prefix: String, + configType: String): Map[String, String] = { + val fromPrefix = sparkConf.getAllWithPrefix(prefix) + fromPrefix.groupBy(_._1).foreach { + case (key, values) => + require(values.size == 1, + s"Cannot have multiple values for a given $configType key, got key $key with" + + s" values $values") + } + fromPrefix.toMap + } /** * Extract and parse Spark configuration properties with a given name prefix and diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala index ff5ad6673b309..fc75f36721c49 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStep.scala @@ -174,17 +174,17 @@ private[spark] class DriverKubernetesCredentialsFeatureStep(kubernetesConf: Kube * which may be empty if the user-specified credential is empty. */ private def resolveSecretData( - userSpecifiedCredential: Option[String], - secretName: String): Map[String, String] = { + userSpecifiedCredential: Option[String], + secretName: String): Map[String, String] = { userSpecifiedCredential.map { valueBase64 => Map(secretName -> valueBase64) }.getOrElse(Map.empty[String, String]) } private def resolveSecretLocation( - mountedUserSpecified: Option[String], - valueMountedFromSubmitter: Option[String], - mountedCanonicalLocation: String): Option[String] = { + mountedUserSpecified: Option[String], + valueMountedFromSubmitter: Option[String], + mountedCanonicalLocation: String): Option[String] = { mountedUserSpecified.orElse(valueMountedFromSubmitter.map { _ => mountedCanonicalLocation }) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala new file mode 100644 index 0000000000000..3267c7a547806 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/ExternalShuffleLocalDirsFeatureStep.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.features + +import io.fabric8.kubernetes.api.model.{Volume, VolumeBuilder, VolumeMount, VolumeMountBuilder} +import org.apache.commons.io.FilenameUtils + +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf} +import org.apache.spark.util.Utils + +class ExternalShuffleLocalDirsFeatureStep( + conf: KubernetesConf[_ <: KubernetesRoleSpecificConf]) extends LocalDirsFeatureStep(conf) { + + private val resolvedLocalDirs = Utils.getConfiguredLocalDirs(conf.sparkConf) + + override def getDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { + // TODO: Using hostPath for the local directory will also make it such that the + // other uses of the local directory - broadcasting and caching - will also write + // to the directory that the shuffle service is aware of. It would be better for + // these directories to be separate so that the lifetime of the non-shuffle scratch + // space is tied to an emptyDir instead of the hostPath. This requires a change in + // core Spark as well. + resolvedLocalDirs.zipWithIndex.map { + case (shuffleDir, shuffleDirIndex) => + val volumeName = s"$shuffleDirIndex-${FilenameUtils.getBaseName(shuffleDir)}" + val volume = new VolumeBuilder() + .withName(volumeName) + .withNewHostPath(shuffleDir) + .build() + val volumeMount = new VolumeMountBuilder() + .withName(volumeName) + .withMountPath(shuffleDir) + .build() + (volume, volumeMount) + } + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala index be386e119d465..9d1bbb2ac970a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStep.scala @@ -16,12 +16,11 @@ */ package org.apache.spark.deploy.k8s.features -import java.nio.file.Paths import java.util.UUID -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder, VolumeBuilder, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model._ -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, KubernetesRoleSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ private[spark] class LocalDirsFeatureStep( @@ -40,10 +39,10 @@ private[spark] class LocalDirsFeatureStep( .split(",") private val useLocalDirTmpFs = conf.get(KUBERNETES_LOCAL_DIRS_TMPFS) - override def configurePod(pod: SparkPod): SparkPod = { + def getDirVolumesWithMounts(): Seq[(Volume, VolumeMount)] = { val localDirVolumes = resolvedLocalDirs .zipWithIndex - .map { case (localDir, index) => + .map { case (_, index) => new VolumeBuilder() .withName(s"spark-local-dir-${index + 1}") .withNewEmptyDir() @@ -59,9 +58,14 @@ private[spark] class LocalDirsFeatureStep( .withMountPath(localDirPath) .build() } + localDirVolumes.zip(localDirVolumeMounts) + } + + override def configurePod(pod: SparkPod): SparkPod = { + val volumesWithMounts = getDirVolumesWithMounts() val podWithLocalDirVolumes = new PodBuilder(pod.pod) .editSpec() - .addToVolumes(localDirVolumes: _*) + .addToVolumes(volumesWithMounts.map(_._1): _*) .endSpec() .build() val containerWithLocalDirVolumeMounts = new ContainerBuilder(pod.container) @@ -69,7 +73,7 @@ private[spark] class LocalDirsFeatureStep( .withName("SPARK_LOCAL_DIRS") .withValue(resolvedLocalDirs.mkString(",")) .endEnv() - .addToVolumeMounts(localDirVolumeMounts: _*) + .addToVolumeMounts(volumesWithMounts.map(_._2): _*) .build() SparkPod(podWithLocalDirVolumes, containerWithLocalDirVolumeMounts) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 9999c62c878df..7e6524cd15534 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -23,10 +23,13 @@ import com.google.common.cache.CacheBuilder import io.fabric8.kubernetes.client.Config import org.apache.spark.SparkContext -import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesRoleSpecificConf, KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.internal.Logging +import org.apache.spark.deploy.k8s.features.{ExternalShuffleLocalDirsFeatureStep, LocalDirsFeatureStep} +import org.apache.spark.internal.{config, Logging} +import org.apache.spark.network.netty.SparkTransportConf +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.util.{SystemClock, ThreadUtils} @@ -79,15 +82,38 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit val removedExecutorsCache = CacheBuilder.newBuilder() .expireAfterWrite(3, TimeUnit.MINUTES) .build[java.lang.Long, java.lang.Long]() + + val kubernetesShuffleManager = if (sc.conf.get( + org.apache.spark.internal.config.SHUFFLE_SERVICE_ENABLED)) { + val kubernetesExternalShuffleClient = new KubernetesExternalShuffleClient( + SparkTransportConf.fromSparkConf(sc.conf, "shuffle"), + sc.env.securityManager, + sc.env.securityManager.isAuthenticationEnabled(), + sc.conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT)) + Some(new KubernetesExternalShuffleManagerImpl( + sc.conf, + kubernetesClient, + kubernetesExternalShuffleClient)) + } else None + + val localDirsFeatureStepFactory: KubernetesConf[_ <: KubernetesRoleSpecificConf] + => LocalDirsFeatureStep = if (kubernetesShuffleManager.isDefined) { + new ExternalShuffleLocalDirsFeatureStep(_) + } else { + new LocalDirsFeatureStep(_) + } + + val kubernetesExecutorBuilder = new KubernetesExecutorBuilder( + provideLocalDirsStep = localDirsFeatureStepFactory) val executorPodsLifecycleEventHandler = new ExecutorPodsLifecycleManager( sc.conf, - new KubernetesExecutorBuilder(), + kubernetesExecutorBuilder, kubernetesClient, snapshotsStore, removedExecutorsCache) val executorPodsAllocator = new ExecutorPodsAllocator( - sc.conf, new KubernetesExecutorBuilder(), kubernetesClient, snapshotsStore, new SystemClock()) + sc.conf, kubernetesExecutorBuilder, kubernetesClient, snapshotsStore, new SystemClock()) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, @@ -107,7 +133,8 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit executorPodsAllocator, executorPodsLifecycleEventHandler, podsWatchEventSource, - podsPollingEventSource) + podsPollingEventSource, + kubernetesShuffleManager) } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index fa6dc2c479bbf..f134bee60705b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -19,12 +19,15 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.ExecutorService import io.fabric8.kubernetes.client.KubernetesClient +import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} +import org.apache.spark.SparkEnv import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.rpc.{RpcAddress, RpcEnv} +import org.apache.spark.rpc.{RpcAddress, RpcCallContext, RpcEnv} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RetrieveSparkAppConfig, SparkAppConfig} import org.apache.spark.util.{ThreadUtils, Utils} private[spark] class KubernetesClusterSchedulerBackend( @@ -36,7 +39,8 @@ private[spark] class KubernetesClusterSchedulerBackend( podAllocator: ExecutorPodsAllocator, lifecycleEventHandler: ExecutorPodsLifecycleManager, watchEvents: ExecutorPodsWatchSnapshotSource, - pollEvents: ExecutorPodsPollingSnapshotSource) + pollEvents: ExecutorPodsPollingSnapshotSource, + kubernetesShuffleManager: Option[KubernetesExternalShuffleManager]) extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { private implicit val requestExecutorContext = ExecutionContext.fromExecutorService( @@ -61,6 +65,7 @@ private[spark] class KubernetesClusterSchedulerBackend( if (!Utils.isDynamicAllocationEnabled(conf)) { podAllocator.setTotalExpectedExecutors(initialExecutors) } + kubernetesShuffleManager.foreach(_.start(applicationId())) lifecycleEventHandler.start(this) podAllocator.start(applicationId()) watchEvents.start(applicationId()) @@ -93,6 +98,10 @@ private[spark] class KubernetesClusterSchedulerBackend( ThreadUtils.shutdown(requestExecutorsService) } + Utils.tryLogNonFatalError { + kubernetesShuffleManager.foreach(_.stop()) + } + Utils.tryLogNonFatalError { kubernetesClient.close() } @@ -136,6 +145,46 @@ private[spark] class KubernetesClusterSchedulerBackend( // to be deleted eventually. addressToExecutorId.get(rpcAddress).foreach(disableExecutor) } + + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + new PartialFunction[Any, Unit]() { + override def isDefinedAt(msg: Any): Boolean = { + msg match { + case RetrieveSparkAppConfig => + kubernetesShuffleManager.isDefined + case _ => false + } + } + + override def apply(msg: Any): Unit = { + msg match { + case RetrieveSparkAppConfig if kubernetesShuffleManager.isDefined => + val senderAddress = context.senderAddress + val allExecutorPods = kubernetesClient + .pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .list() + .getItems + .asScala + val executor = allExecutorPods.find(pod => + pod.getStatus.getPodIP.equals(senderAddress.host) || + pod.getSpec.getHostname.equals(senderAddress.host)) + if (executor.isDefined) { + val shuffleSpecificProperties = kubernetesShuffleManager.get + .getShuffleServiceConfigurationForExecutor(executor.get) + val reply = SparkAppConfig( + sparkProperties ++ shuffleSpecificProperties, + SparkEnv.get.securityManager.getIOEncryptionKey(), + fetchHadoopDelegationTokens()) + context.reply(reply) + } else { + logError(s"Got RetrieveSparkAppConfig message from unknown executor" + + s" address $senderAddress") + } + } + } + }.orElse(super.receiveAndReply(context)) + } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala index 364b6fb367722..d48a66d373400 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala @@ -21,7 +21,7 @@ import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.features.{BasicExecutorFeatureStep, EnvSecretsFeatureStep, LocalDirsFeatureStep, MountSecretsFeatureStep} private[spark] class KubernetesExecutorBuilder( - provideBasicStep: (KubernetesConf [KubernetesExecutorSpecificConf]) + provideBasicStep: (KubernetesConf[KubernetesExecutorSpecificConf]) => BasicExecutorFeatureStep = new BasicExecutorFeatureStep(_), provideSecretsStep: (KubernetesConf[_ <: KubernetesRoleSpecificConf]) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala new file mode 100644 index 0000000000000..13745267a7832 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExternalShuffleManager.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watch, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness +import scala.collection.JavaConverters._ + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.KubernetesUtils +import org.apache.spark.internal.Logging +import org.apache.spark.network.shuffle.kubernetes.KubernetesExternalShuffleClient + +private[spark] trait KubernetesExternalShuffleManager { + def start(appId: String): Unit + + def stop(): Unit + + /** + * Returns the properties that should be applied for this executor pod, given that + * this executor will need to communicate with an external shuffle service. + * + * In practice, this seq will always have a size of 1, but since this method's semantics are that + * the returned values are key-value pairs to apply as properties, it is clearer to express + * this as a collection. + */ + def getShuffleServiceConfigurationForExecutor(executorPod: Pod): Seq[(String, String)] +} + +private[spark] class KubernetesExternalShuffleManagerImpl( + sparkConf: SparkConf, + client: KubernetesClient, + shuffleClient: KubernetesExternalShuffleClient) + extends KubernetesExternalShuffleManager with Logging { + + private val shuffleNamespace = sparkConf.get(KUBERNETES_SHUFFLE_NAMESPACE) + private val shufflePodLabels = KubernetesUtils.parseKeyValuePairs( + sparkConf.get(KUBERNETES_SHUFFLE_LABELS), + KUBERNETES_SHUFFLE_LABELS.key, + "shuffle-labels") + if (shufflePodLabels.isEmpty) { + throw new SparkException(s"Dynamic allocation enabled " + + s"but no ${KUBERNETES_SHUFFLE_LABELS.key} specified") + } + private val externalShufflePort = sparkConf.getInt("spark.shuffle.service.port", 7337) + private val shufflePodCache = scala.collection.mutable.Map[String, String]() + private var watcher: Watch = _ + + override def start(appId: String): Unit = { + // seed the initial cache. + val pods = client.pods() + .inNamespace(shuffleNamespace) + .withLabels(shufflePodLabels.asJava) + .list() + pods.getItems.asScala.foreach { + pod => + if (Readiness.isReady(pod)) { + addShufflePodToCache(pod) + } else { + logWarning(s"Found unready shuffle pod ${pod.getMetadata.getName} " + + s"on node ${pod.getSpec.getNodeName}") + } + } + + watcher = client + .pods() + .inNamespace(shuffleNamespace) + .withLabels(shufflePodLabels.asJava) + .watch(new Watcher[Pod] { + override def eventReceived(action: Watcher.Action, p: Pod): Unit = { + action match { + case Action.DELETED | Action.ERROR => + shufflePodCache.remove(p.getSpec.getNodeName) + case Action.ADDED | Action.MODIFIED if Readiness.isReady(p) => + addShufflePodToCache(p) + } + } + override def onClose(e: KubernetesClientException): Unit = {} + }) + shuffleClient.init(appId) + } + + private def addShufflePodToCache(pod: Pod): Unit = shufflePodCache.synchronized { + if (shufflePodCache.contains(pod.getSpec.getNodeName)) { + val registeredPodName = shufflePodCache(pod.getSpec.getNodeName) + if (registeredPodName.equals(pod.getStatus.getPodIP)) { + logWarning(s"The same pod $registeredPodName is added again on ${pod.getSpec.getNodeName}") + } else { + throw new SparkException(s"Ambiguous specification of shuffle service pod. " + + s"Found multiple matching pods: ${pod.getMetadata.getName}, " + + s"$registeredPodName on ${pod.getSpec.getNodeName}") + } + } else { + shufflePodCache(pod.getSpec.getNodeName) = pod.getStatus.getPodIP + } + } + + override def stop(): Unit = { + watcher.close() + shuffleClient.close() + } + + override def getShuffleServiceConfigurationForExecutor(executorPod: Pod) + : Seq[(String, String)] = { + val nodeName = executorPod.getSpec.getNodeName + val shufflePodIp = shufflePodCache.synchronized { + shufflePodCache.getOrElse(nodeName, + throw new SparkException(s"Unable to find shuffle pod on node $nodeName")) + } + // Inform the shuffle pod about this application so it can watch. + shuffleClient.registerDriverWithShuffleService( + shufflePodIp, + externalShufflePort, + sparkConf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", + s"${sparkConf.getTimeAsSeconds("spark.network.timeout", "120s")}s"), + sparkConf.getTimeAsMs("spark.executor.heartbeatInterval", "10s") + ) + Seq((SPARK_SHUFFLE_SERVICE_HOST.key, shufflePodIp)) + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index 52e7a12dbaf06..9e935944ae106 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -95,7 +95,8 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn podAllocator, lifecycleEventHandler, watchEvents, - pollEvents) { + pollEvents, + None) { override def applicationId(): String = TEST_SPARK_APP_ID } } diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index 216e8fe31becb..12b72848c3637 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -38,7 +38,7 @@ fi SPARK_K8S_CMD="$1" case "$SPARK_K8S_CMD" in - driver | driver-py | driver-r | executor) + driver | driver-py | driver-r | executor | shuffle-service) shift 1 ;; "") @@ -123,7 +123,13 @@ case "$SPARK_K8S_CMD" in --hostname $SPARK_EXECUTOR_POD_IP ) ;; - + shuffle-service) + CMD=( + ${JAVA_HOME}/bin/java + -cp "$SPARK_CLASSPATH" + org.apache.spark.deploy.k8s.KubernetesExternalShuffleService + ) + ;; *) echo "Unknown command: $SPARK_K8S_CMD" 1>&2 exit 1 diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala new file mode 100644 index 0000000000000..c44b7c9c01e7d --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DynamicAllocationTestSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.util + +import io.fabric8.kubernetes.api.model.{VolumeBuilder, VolumeMountBuilder} +import io.fabric8.kubernetes.api.model.extensions.DaemonSetBuilder +import scala.collection.JavaConverters._ + +import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite.k8sTestTag + +private[spark] trait DynamicAllocationTestSuite { k8sSuite: KubernetesSuite => + import DynamicAllocationTestSuite._ + + private def startShuffleService(): Unit = { + val volume = new VolumeBuilder() + .withName(VOLUME_NAME) + .withNewHostPath(EXECUTOR_LOCAL_DIR) + .build() + val volumeMount = new VolumeMountBuilder() + .withName(VOLUME_NAME) + .withMountPath(EXECUTOR_LOCAL_DIR) + .build() + val daemonSet = new DaemonSetBuilder() + .withNewMetadata() + .withName(SHUFFLE_SERVICE_NAME) + .withLabels(SHUFFLE_SERVICE_LABEL) + .endMetadata() + .withNewSpec() + .withNewTemplate() + .withNewMetadata() + .withLabels(SHUFFLE_SERVICE_LABEL) + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("k8s-shuffle-service") + .withImage(image) + .withArgs("shuffle-service") + .withVolumeMounts(volumeMount) + .endContainer() + .withVolumes(volume) + .endSpec() + .endTemplate() + .endSpec() + .build() + kubernetesTestComponents.kubernetesClient + .extensions() + .daemonSets() + .inNamespace(kubernetesTestComponents.namespace) + .create(daemonSet) + } + + private def stopShuffleService(): Unit = { + kubernetesTestComponents.kubernetesClient + .extensions() + .daemonSets() + .inNamespace(kubernetesTestComponents.namespace) + .withName(SHUFFLE_SERVICE_NAME) + .delete() + } + + test("Run in cluster mode with dynamic allocation.", k8sTestTag) { + startShuffleService() + try { + runAndVerifyCompletion() + } finally { + stopShuffleService() + } + } + + private def runAndVerifyCompletion(): Unit = { + sparkAppConf + .set("spark.local.dir", EXECUTOR_LOCAL_DIR) + .set("spark.kubernetes.shuffle.namespace", kubernetesTestComponents.namespace) + .set("spark.dynamicAllocation.enabled", "true") + .set("spark.shuffle.service.enabled", "true") + .set("spark.dynamicAllocation.maxExecutors", "4") + .set("spark.kubernetes.shuffle.labels", "app=k8s-shuffle-service") + runSparkApplicationAndVerifyCompletion( + containerLocalSparkDistroExamplesJar, + SPARK_GROUPBY_MAIN_CLASS, + Seq("RESULT: "), + Array("10", "400000", "2"), + doBasicDriverPodCheck, + doBasicExecutorPodCheck, + appLocator, + isJVM = true + ) + } +} + +private[spark] object DynamicAllocationTestSuite { + val SPARK_GROUPBY_MAIN_CLASS = "org.apache.spark.examples.SimpleSkewedGroupByTest" + val SHUFFLE_SERVICE_NAME = "k8s-external-shuffle-service" + val VOLUME_NAME = "shuffle-dir" + val EXECUTOR_LOCAL_DIR = "/tmp/spark-local" + val SHUFFLE_SERVICE_LABEL: util.Map[String, String] = Map("app" -> "k8s-shuffle-service").asJava +} diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index c99a907f98d0a..03164ebe604ff 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -38,12 +38,12 @@ import org.apache.spark.internal.Logging private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite - with PythonTestsSuite with ClientModeTestsSuite + with PythonTestsSuite with ClientModeTestsSuite with DynamicAllocationTestSuite with Logging with Eventually with Matchers { import KubernetesSuite._ - private var sparkHomeDir: Path = _ + protected var sparkHomeDir: Path = _ private var pyImage: String = _ private var rImage: String = _ diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala index 5615d6173eebd..fe8290257c17b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -120,7 +120,7 @@ private[spark] object SparkAppLauncher extends Logging { appConf.toStringArray :+ appArguments.mainAppResource if (appArguments.appArgs.nonEmpty) { - commandLine += appArguments.appArgs.mkString(" ") + commandLine ++= appArguments.appArgs } logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}") ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala index 6494cbc18f33e..af85aae216784 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.deploy.k8s.integrationtest.backend.minikube -import java.io.File import java.nio.file.Paths import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala index 859aa836a3157..6d94b9efd1d29 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala @@ -28,8 +28,7 @@ import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.Logging import org.apache.spark.network.client.{RpcResponseCallback, TransportClient} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler -import org.apache.spark.network.shuffle.protocol.BlockTransferMessage -import org.apache.spark.network.shuffle.protocol.mesos.{RegisterDriver, ShuffleServiceHeartbeat} +import org.apache.spark.network.shuffle.protocol.{BlockTransferMessage, RegisterDriver, ShuffleServiceHeartbeat} import org.apache.spark.network.util.TransportConf import org.apache.spark.util.ThreadUtils ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for dynamic resource allocation > ------------------------------------------- > > Key: SPARK-24432 > URL: https://issues.apache.org/jira/browse/SPARK-24432 > Project: Spark > Issue Type: New Feature > Components: Kubernetes > Affects Versions: 2.4.0, 3.0.0 > Reporter: Yinan Li > Priority: Major > > This is an umbrella ticket for work on adding support for dynamic resource > allocation into the Kubernetes mode. This requires a Kubernetes-specific > external shuffle service. The feature is available in our fork at > github.com/apache-spark-on-k8s/spark. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org