This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ff3f3c4 [SPARK-36058][K8S] Add support for statefulset APIs in K8s ff3f3c4 is described below commit ff3f3c45668364da9bd10992791b5ee9a46fea21 Author: Holden Karau <hka...@netflix.com> AuthorDate: Wed Aug 25 17:38:57 2021 -0700 [SPARK-36058][K8S] Add support for statefulset APIs in K8s ### What changes were proposed in this pull request? Generalize the pod allocator and add support for statefulsets. ### Why are the changes needed? Allocating individual pods in Spark can be not ideal for some clusters and using higher level operators like statefulsets and replicasets can be useful. ### Does this PR introduce _any_ user-facing change? Yes new config options. ### How was this patch tested? Completed: New unit & basic integration test PV integration tests Closes #33508 from holdenk/SPARK-36058-support-replicasets-or-job-api-like-things. Lead-authored-by: Holden Karau <hka...@netflix.com> Co-authored-by: Holden Karau <hol...@pigscanfly.ca> Signed-off-by: Holden Karau <hka...@netflix.com> --- .../executor/CoarseGrainedExecutorBackend.scala | 2 +- .../scala/org/apache/spark/storage/DiskStore.scala | 9 +- .../apache/spark/examples/MiniReadWriteTest.scala | 139 +++++++++++++ .../scala/org/apache/spark/deploy/k8s/Config.scala | 10 + .../org/apache/spark/deploy/k8s/Constants.scala | 1 + .../k8s/features/BasicExecutorFeatureStep.scala | 15 +- .../cluster/k8s/AbstractPodsAllocator.scala | 59 ++++++ .../cluster/k8s/ExecutorPodsAllocator.scala | 18 +- .../cluster/k8s/ExecutorPodsSnapshot.scala | 11 +- .../cluster/k8s/KubernetesClusterManager.scala | 38 +++- .../cluster/k8s/KubernetesClusterMessage.scala | 21 ++ .../k8s/KubernetesClusterSchedulerBackend.scala | 41 +++- .../cluster/k8s/KubernetesExecutorBackend.scala | 228 +++++++++++++++++++++ .../cluster/k8s/StatefulsetPodsAllocator.scala | 201 ++++++++++++++++++ .../features/BasicExecutorFeatureStepSuite.scala | 6 +- .../cluster/k8s/ExecutorPodsAllocatorSuite.scala | 31 ++- .../k8s/KubernetesClusterManagerSuite.scala | 58 ++++++ .../KubernetesClusterSchedulerBackendSuite.scala | 13 +- .../cluster/k8s/StatefulsetAllocatorSuite.scala | 153 ++++++++++++++ .../src/main/dockerfiles/spark/entrypoint.sh | 3 +- .../k8s/integrationtest/BasicTestsSuite.scala | 17 ++ .../k8s/integrationtest/KubernetesSuite.scala | 33 ++- .../deploy/k8s/integrationtest/PVTestsSuite.scala | 88 +++++++- 23 files changed, 1148 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 76d01f8..c87e61a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -61,7 +61,7 @@ private[spark] class CoarseGrainedExecutorBackend( private implicit val formats = DefaultFormats - private[executor] val stopping = new AtomicBoolean(false) + private[spark] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index 7269913..f0334c5 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -59,7 +59,14 @@ private[spark] class DiskStore( */ def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = { if (contains(blockId)) { - throw new IllegalStateException(s"Block $blockId is already present in the disk store") + logWarning(s"Block $blockId is already present in the disk store") + try { + diskManager.getFile(blockId).delete() + } catch { + case e: Exception => + throw new IllegalStateException( + s"Block $blockId is already present in the disk store and could not delete it $e") + } } logDebug(s"Attempting to put block $blockId") val startTimeNs = System.nanoTime() diff --git a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala new file mode 100644 index 0000000..5a74e1c --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples + +import java.io.File +import java.io.PrintWriter + +import scala.io.Source._ + +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.Utils + +/** + * Simple test for reading and writing to a distributed + * file system. This example does the following: + * + * 1. Reads local file + * 2. Computes word count on local file + * 3. Writes local file to a local dir on each executor + * 4. Reads the file back from each exec + * 5. Computes word count on the file using Spark + * 6. Compares the word count results + */ +object MiniReadWriteTest { + + private val NPARAMS = 1 + + private def readFile(filename: String): List[String] = { + Utils.tryWithResource(fromFile(filename))(_.getLines().toList) + } + + private def printUsage(): Unit = { + val usage = """Mini Read-Write Test + |Usage: localFile + |localFile - (string) location of local file to distribute to executors.""".stripMargin + + println(usage) + } + + private def parseArgs(args: Array[String]): File = { + if (args.length != NPARAMS) { + printUsage() + System.exit(1) + } + + var i = 0 + + val localFilePath = new File(args(i)) + if (!localFilePath.exists) { + System.err.println(s"Given path (${args(i)}) does not exist") + printUsage() + System.exit(1) + } + + if (!localFilePath.isFile) { + System.err.println(s"Given path (${args(i)}) is not a file") + printUsage() + System.exit(1) + } + localFilePath + } + + def runLocalWordCount(fileContents: List[String]): Int = { + fileContents.flatMap(_.split(" ")) + .flatMap(_.split("\t")) + .filter(_.nonEmpty) + .groupBy(w => w) + .mapValues(_.size) + .values + .sum + } + + def main(args: Array[String]): Unit = { + val localFilePath = parseArgs(args) + + println(s"Performing local word count from ${localFilePath}") + val fileContents = readFile(localFilePath.toString()) + println(s"File contents are ${fileContents}") + val localWordCount = runLocalWordCount(fileContents) + + println("Creating SparkSession") + val spark = SparkSession + .builder + .appName("Mini Read Write Test") + .getOrCreate() + + println("Writing local file to executors") + + // uses the fact default parallelism is greater than num execs + val misc = spark.sparkContext.parallelize(1.to(10)) + misc.foreachPartition { + x => + new PrintWriter(localFilePath) { + try { + write(fileContents.mkString("\n")) + } finally { + close() + }} + } + + println("Reading file from execs and running Word Count") + val readFileRDD = spark.sparkContext.textFile(localFilePath.toString()) + + val dWordCount = readFileRDD + .flatMap(_.split(" ")) + .flatMap(_.split("\t")) + .filter(_.nonEmpty) + .map(w => (w, 1)) + .countByKey() + .values + .sum + + spark.stop() + if (localWordCount == dWordCount) { + println(s"Success! Local Word Count $localWordCount and " + + s"D Word Count $dWordCount agree.") + } else { + println(s"Failure! Local Word Count $localWordCount " + + s"and D Word Count $dWordCount disagree.") + } + } +} +// scalastyle:on println diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 2c4adee..2aa4fbc 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -323,6 +323,16 @@ private[spark] object Config extends Logging { .stringConf .createOptional + val KUBERNETES_ALLOCATION_PODS_ALLOCATOR = + ConfigBuilder("spark.kubernetes.allocation.pods.allocator") + .doc("Allocator to use for pods. Possible values are direct (the default) and statefulset " + + ", or a full class name of a class implementing AbstractPodsAllocator. " + + "Future version may add Job or replicaset. This is a developer API and may change " + + "or be removed at anytime.") + .version("3.3.0") + .stringConf + .createWithDefault("direct") + val KUBERNETES_ALLOCATION_BATCH_SIZE = ConfigBuilder("spark.kubernetes.allocation.batch.size") .doc("Number of pods to launch at once in each round of executor allocation.") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index 543ca12..f4b362b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -59,6 +59,7 @@ private[spark] object Constants { val ENV_APPLICATION_ID = "SPARK_APPLICATION_ID" val ENV_EXECUTOR_ID = "SPARK_EXECUTOR_ID" val ENV_EXECUTOR_POD_IP = "SPARK_EXECUTOR_POD_IP" + val ENV_EXECUTOR_POD_NAME = "SPARK_EXECUTOR_POD_NAME" val ENV_JAVA_OPT_PREFIX = "SPARK_JAVA_OPT_" val ENV_CLASSPATH = "SPARK_CLASSPATH" val ENV_DRIVER_BIND_ADDRESS = "SPARK_DRIVER_BIND_ADDRESS" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index a90d2ab..4d56468 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -139,6 +139,13 @@ private[spark] class BasicExecutorFeatureStep( .build()) .build()) } ++ { + Seq(new EnvVarBuilder() + .withName(ENV_EXECUTOR_POD_NAME) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "metadata.name") + .build()) + .build()) + } ++ { if (kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty) { Option(secMgr.getSecretKey()).map { authSecret => new EnvVarBuilder() @@ -260,16 +267,22 @@ private[spark] class BasicExecutorFeatureStep( .withUid(pod.getMetadata.getUid) .build() } + + val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match { + case "statefulset" => "Always" + case _ => "Never" + } val executorPodBuilder = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) .addToLabels(kubernetesConf.labels.asJava) + .addToLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfile.id.toString) .addToAnnotations(kubernetesConf.annotations.asJava) .addToOwnerReferences(ownerReference.toSeq: _*) .endMetadata() .editOrNewSpec() .withHostname(hostname) - .withRestartPolicy("Never") + .withRestartPolicy(policy) .addToNodeSelector(kubernetesConf.nodeSelector.asJava) .addToNodeSelector(kubernetesConf.executorNodeSelector.asJava) .addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala new file mode 100644 index 0000000..2e0d4fa --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/AbstractPodsAllocator.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model.Pod + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.resource.ResourceProfile + + +/** + * :: DeveloperApi :: + * A abstract interface for allowing different types of pods allocation. + * + * The internal Spark implementations are [[StatefulsetPodsAllocator]] + * and [[ExecutorPodsAllocator]]. This may be useful for folks integrating with custom schedulers + * such as Volcano, Yunikorn, etc. + * + * This API may change or be removed at anytime. + * + * @since 3.3.0 + */ +@DeveloperApi +abstract class AbstractPodsAllocator { + /* + * Set the total expected executors for an application + */ + def setTotalExpectedExecutors(resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit + /* + * Reference to driver pod. + */ + def driverPod: Option[Pod] + /* + * If the pod for a given exec id is deleted. + */ + def isDeleted(executorId: String): Boolean + /* + * Start hook. + */ + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit + /* + * Stop hook + */ + def stop(applicationId: String): Unit +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index cee5360..043f0e9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -37,13 +37,13 @@ import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT import org.apache.spark.resource.ResourceProfile import org.apache.spark.util.{Clock, Utils} -private[spark] class ExecutorPodsAllocator( +class ExecutorPodsAllocator( conf: SparkConf, secMgr: SecurityManager, executorBuilder: KubernetesExecutorBuilder, kubernetesClient: KubernetesClient, snapshotsStore: ExecutorPodsSnapshotsStore, - clock: Clock) extends Logging { + clock: Clock) extends AbstractPodsAllocator() with Logging { private val EXECUTOR_ID_COUNTER = new AtomicInteger(0) @@ -97,12 +97,15 @@ private[spark] class ExecutorPodsAllocator( private var lastSnapshot = ExecutorPodsSnapshot() + private var appId: String = _ + // Executors that have been deleted by this allocator but not yet detected as deleted in // a snapshot from the API server. This is used to deny registration from these executors // if they happen to come up before the deletion takes effect. @volatile private var deletedExecutorIds = Set.empty[Long] def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { + appId = applicationId driverPod.foreach { pod => // Wait until the driver pod is ready before starting executors, as the headless service won't // be resolvable by DNS until the driver pod is ready. @@ -461,6 +464,16 @@ private[spark] class ExecutorPodsAllocator( true } } + + override def stop(applicationId: String): Unit = { + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId) + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .delete() + } + } } private[spark] object ExecutorPodsAllocator { @@ -471,5 +484,4 @@ private[spark] object ExecutorPodsAllocator { val r = slots % consumers.size consumers.take(r).map((_, d + 1)) ++ consumers.takeRight(consumers.size - r).map((_, d)) } - } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala index 76c17cf..ff47c17 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsSnapshot.scala @@ -60,8 +60,15 @@ object ExecutorPodsSnapshot extends Logging { } private def toStatesByExecutorId(executorPods: Seq[Pod]): Map[Long, ExecutorPodState] = { - executorPods.map { pod => - (pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL).toLong, toState(pod)) + executorPods.flatMap { pod => + pod.getMetadata.getLabels.get(SPARK_EXECUTOR_ID_LABEL) match { + case "EXECID" | null => + // The exec label has not yet been assigned + None + case id => + // We have a "real" id label + Some((id.toLong, toState(pod))) + } }.toMap } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index 4d28d38..9497349 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -19,14 +19,15 @@ package org.apache.spark.scheduler.cluster.k8s import java.io.File import io.fabric8.kubernetes.client.Config +import io.fabric8.kubernetes.client.KubernetesClient -import org.apache.spark.SparkContext +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants.DEFAULT_EXECUTOR_CONTAINER_NAME import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} -import org.apache.spark.util.{SystemClock, ThreadUtils} +import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { @@ -109,13 +110,7 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit kubernetesClient, snapshotsStore) - val executorPodsAllocator = new ExecutorPodsAllocator( - sc.conf, - sc.env.securityManager, - new KubernetesExecutorBuilder(), - kubernetesClient, - snapshotsStore, - new SystemClock()) + val executorPodsAllocator = makeExecutorPodsAllocator(sc, kubernetesClient, snapshotsStore) val podsWatchEventSource = new ExecutorPodsWatchSnapshotSource( snapshotsStore, @@ -138,6 +133,31 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit podsPollingEventSource) } + private[k8s] def makeExecutorPodsAllocator(sc: SparkContext, kubernetesClient: KubernetesClient, + snapshotsStore: ExecutorPodsSnapshotsStore) = { + val executorPodsAllocatorName = sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match { + case "statefulset" => + classOf[StatefulsetPodsAllocator].getName + case "direct" => + classOf[ExecutorPodsAllocator].getName + case fullClass => + fullClass + } + + val cls = Utils.classForName[AbstractPodsAllocator](executorPodsAllocatorName) + val cstr = cls.getConstructor( + classOf[SparkConf], classOf[org.apache.spark.SecurityManager], + classOf[KubernetesExecutorBuilder], classOf[KubernetesClient], + classOf[ExecutorPodsSnapshotsStore], classOf[Clock]) + cstr.newInstance( + sc.conf, + sc.env.securityManager, + new KubernetesExecutorBuilder(), + kubernetesClient, + snapshotsStore, + new SystemClock()) + } + override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterMessage.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterMessage.scala new file mode 100644 index 0000000..4a8dae6 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterMessage.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.io.Serializable + +case class GenerateExecID(podName: String) extends Serializable diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index 25f6851..d8e97e1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -17,6 +17,7 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.{ScheduledExecutorService, TimeUnit} +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.concurrent.Future @@ -46,7 +47,7 @@ private[spark] class KubernetesClusterSchedulerBackend( kubernetesClient: KubernetesClient, executorService: ScheduledExecutorService, snapshotsStore: ExecutorPodsSnapshotsStore, - podAllocator: ExecutorPodsAllocator, + podAllocator: AbstractPodsAllocator, lifecycleEventHandler: ExecutorPodsLifecycleManager, watchEvents: ExecutorPodsWatchSnapshotSource, pollEvents: ExecutorPodsPollingSnapshotSource) @@ -97,10 +98,11 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() + // Must be called before setting the executors + podAllocator.start(applicationId(), this) val initExecs = Map(defaultProfile -> initialExecutors) podAllocator.setTotalExpectedExecutors(initExecs) lifecycleEventHandler.start(this) - podAllocator.start(applicationId(), this) watchEvents.start(applicationId()) pollEvents.start(applicationId()) if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) { @@ -144,13 +146,9 @@ private[spark] class KubernetesClusterSchedulerBackend( } if (shouldDeleteExecutors) { - Utils.tryLogNonFatalError { - kubernetesClient - .pods() - .withLabel(SPARK_APP_ID_LABEL, applicationId()) - .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) - .delete() - } + + podAllocator.stop(applicationId()) + if (!conf.get(KUBERNETES_EXECUTOR_DISABLE_CONFIGMAP)) { Utils.tryLogNonFatalError { kubernetesClient @@ -278,6 +276,8 @@ private[spark] class KubernetesClusterSchedulerBackend( new KubernetesDriverEndpoint() } + val execId = new AtomicInteger(0) + override protected def createTokenManager(): Option[HadoopDelegationTokenManager] = { Some(new HadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)) } @@ -287,12 +287,33 @@ private[spark] class KubernetesClusterSchedulerBackend( } private class KubernetesDriverEndpoint extends DriverEndpoint { + private def generateExecID(context: RpcCallContext): PartialFunction[Any, Unit] = { + case x: GenerateExecID => + val newId = execId.incrementAndGet().toString + context.reply(newId) + // Generally this should complete quickly but safer to not block in-case we're in the + // middle of an etcd fail over or otherwise slower writes. + val labelTask = new Runnable() { + override def run(): Unit = Utils.tryLogNonFatalError { + // Label the pod with it's exec ID + kubernetesClient.pods() + .withName(x.podName) + .edit({p: Pod => new PodBuilder(p).editMetadata() + .addToLabels(SPARK_EXECUTOR_ID_LABEL, newId.toString) + .endMetadata() + .build()}) + } + } + executorService.execute(labelTask) + } private def ignoreRegisterExecutorAtStoppedContext: PartialFunction[Any, Unit] = { case _: RegisterExecutor if sc.isStopped => // No-op } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = - ignoreRegisterExecutorAtStoppedContext.orElse(super.receiveAndReply(context)) + generateExecID(context).orElse( + ignoreRegisterExecutorAtStoppedContext.orElse( + super.receiveAndReply(context))) override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala new file mode 100644 index 0000000..dd06688 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBackend.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.worker.WorkerWatcher +import org.apache.spark.executor.CoarseGrainedExecutorBackend +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID +import org.apache.spark.rpc._ +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ +import org.apache.spark.util.Utils + +private[spark] object KubernetesExecutorBackend extends Logging { + + // Message used internally to start the executor when the driver successfully accepted the + // registration request. + case object RegisteredExecutor + + case class Arguments( + driverUrl: String, + executorId: String, + bindAddress: String, + hostname: String, + cores: Int, + appId: String, + workerUrl: Option[String], + resourcesFileOpt: Option[String], + resourceProfileId: Int, + podName: String) + + def main(args: Array[String]): Unit = { + val createFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) => + CoarseGrainedExecutorBackend = { case (rpcEnv, arguments, env, resourceProfile, execId) => + new CoarseGrainedExecutorBackend(rpcEnv, arguments.driverUrl, execId, + arguments.bindAddress, arguments.hostname, arguments.cores, + env, arguments.resourcesFileOpt, resourceProfile) + } + run(parseArguments(args, this.getClass.getCanonicalName.stripSuffix("$")), createFn) + System.exit(0) + } + + def run( + arguments: Arguments, + backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile, String) => + CoarseGrainedExecutorBackend): Unit = { + + Utils.initDaemon(log) + + SparkHadoopUtil.get.runAsSparkUser { () => + // Debug code + Utils.checkHost(arguments.hostname) + + // Bootstrap to fetch the driver's Spark properties. + val executorConf = new SparkConf + val fetcher = RpcEnv.create( + "driverPropsFetcher", + arguments.bindAddress, + arguments.hostname, + -1, + executorConf, + new SecurityManager(executorConf), + numUsableCores = 0, + clientMode = true) + + var driver: RpcEndpointRef = null + val nTries = 3 + for (i <- 0 until nTries if driver == null) { + try { + driver = fetcher.setupEndpointRefByURI(arguments.driverUrl) + } catch { + case e: Throwable => if (i == nTries - 1) { + throw e + } + } + } + + val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(arguments.resourceProfileId)) + val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", arguments.appId)) + val execId: String = arguments.executorId match { + case null | "EXECID" | "" => + // We need to resolve the exec id dynamically + driver.askSync[String](GenerateExecID(arguments.podName)) + case id => + id + } + fetcher.shutdown() + + // Create SparkEnv using properties we fetched from the driver. + val driverConf = new SparkConf() + for ((key, value) <- props) { + // this is required for SSL in standalone mode + if (SparkConf.isExecutorStartupConf(key)) { + driverConf.setIfMissing(key, value) + } else { + driverConf.set(key, value) + } + } + + cfg.hadoopDelegationCreds.foreach { tokens => + SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) + } + + driverConf.set(EXECUTOR_ID, execId) + val env = SparkEnv.createExecutorEnv(driverConf, execId, arguments.bindAddress, + arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false) + + val backend = backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile, execId) + env.rpcEnv.setupEndpoint("Executor", backend) + arguments.workerUrl.foreach { url => + env.rpcEnv.setupEndpoint("WorkerWatcher", + new WorkerWatcher(env.rpcEnv, url, isChildProcessStopping = backend.stopping)) + } + env.rpcEnv.awaitTermination() + } + } + + def parseArguments(args: Array[String], classNameForEntry: String): Arguments = { + var driverUrl: String = null + var executorId: String = null + var bindAddress: String = null + var hostname: String = null + var cores: Int = 0 + var resourcesFileOpt: Option[String] = None + var appId: String = null + var workerUrl: Option[String] = None + var resourceProfileId: Int = DEFAULT_RESOURCE_PROFILE_ID + var podName: String = null + + var argv = args.toList + while (!argv.isEmpty) { + argv match { + case ("--driver-url") :: value :: tail => + driverUrl = value + argv = tail + case ("--executor-id") :: value :: tail => + executorId = value + argv = tail + case ("--bind-address") :: value :: tail => + bindAddress = value + argv = tail + case ("--hostname") :: value :: tail => + hostname = value + argv = tail + case ("--cores") :: value :: tail => + cores = value.toInt + argv = tail + case ("--resourcesFile") :: value :: tail => + resourcesFileOpt = Some(value) + argv = tail + case ("--app-id") :: value :: tail => + appId = value + argv = tail + case ("--worker-url") :: value :: tail => + // Worker url is used in spark standalone mode to enforce fate-sharing with worker + workerUrl = Some(value) + argv = tail + case ("--resourceProfileId") :: value :: tail => + resourceProfileId = value.toInt + argv = tail + case ("--podName") :: value :: tail => + podName = value + argv = tail + case Nil => + case tail => + // scalastyle:off println + System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") + // scalastyle:on println + printUsageAndExit(classNameForEntry) + } + } + + if (hostname == null) { + hostname = Utils.localHostName() + log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself") + } + + if (driverUrl == null || executorId == null || cores <= 0 || appId == null) { + printUsageAndExit(classNameForEntry) + } + + if (bindAddress == null) { + bindAddress = hostname + } + + Arguments(driverUrl, executorId, bindAddress, hostname, cores, appId, workerUrl, + resourcesFileOpt, resourceProfileId, podName) + } + + private def printUsageAndExit(classNameForEntry: String): Unit = { + // scalastyle:off println + System.err.println( + s""" + |Usage: $classNameForEntry [options] + | + | Options are: + | --driver-url <driverUrl> + | --executor-id <executorId> + | --bind-address <bindAddress> + | --hostname <hostname> + | --cores <cores> + | --resourcesFile <fileWithJSONResourceInformation> + | --app-id <appid> + | --worker-url <workerUrl> + | --resourceProfileId <id> + | --podName <podName> + |""".stripMargin) + // scalastyle:on println + System.exit(1) + } +} diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetPodsAllocator.scala new file mode 100644 index 0000000..0d00d96 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetPodsAllocator.scala @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model.{PersistentVolumeClaim, + PersistentVolumeClaimBuilder, PodSpec, PodSpecBuilder, PodTemplateSpec} +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.{SecurityManager, SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesConf +import org.apache.spark.deploy.k8s.KubernetesUtils.addOwnerReference +import org.apache.spark.internal.Logging +import org.apache.spark.resource.ResourceProfile +import org.apache.spark.util.{Clock, Utils} + +class StatefulsetPodsAllocator( + conf: SparkConf, + secMgr: SecurityManager, + executorBuilder: KubernetesExecutorBuilder, + kubernetesClient: KubernetesClient, + snapshotsStore: ExecutorPodsSnapshotsStore, + clock: Clock) extends AbstractPodsAllocator() with Logging { + + private val rpIdToResourceProfile = new mutable.HashMap[Int, ResourceProfile] + + private val driverPodReadinessTimeout = conf.get(KUBERNETES_ALLOCATION_DRIVER_READINESS_TIMEOUT) + + private val namespace = conf.get(KUBERNETES_NAMESPACE) + + private val kubernetesDriverPodName = conf + .get(KUBERNETES_DRIVER_POD_NAME) + + val driverPod = kubernetesDriverPodName + .map(name => Option(kubernetesClient.pods() + .withName(name) + .get()) + .getOrElse(throw new SparkException( + s"No pod was found named $name in the cluster in the " + + s"namespace $namespace (this was supposed to be the driver pod.)."))) + + private var appId: String = _ + + def start(applicationId: String, schedulerBackend: KubernetesClusterSchedulerBackend): Unit = { + appId = applicationId + driverPod.foreach { pod => + // Wait until the driver pod is ready before starting executors, as the headless service won't + // be resolvable by DNS until the driver pod is ready. + Utils.tryLogNonFatalError { + kubernetesClient + .pods() + .withName(pod.getMetadata.getName) + .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS) + } + } + } + + def setTotalExpectedExecutors(resourceProfileToTotalExecs: Map[ResourceProfile, Int]): Unit = { + if (appId == null) { + throw new SparkException("setTotalExpectedExecutors called before start of allocator.") + } + resourceProfileToTotalExecs.foreach { case (rp, numExecs) => + rpIdToResourceProfile.getOrElseUpdate(rp.id, rp) + setTargetExecutorsReplicaset(numExecs, appId, rp.id) + } + } + + def isDeleted(executorId: String): Boolean = false + + // For now just track the sets created, in the future maybe track requested value too. + val setsCreated = new mutable.HashSet[Int]() + + private def setName(applicationId: String, rpid: Int): String = { + s"spark-s-${applicationId}-${rpid}" + } + + private def setTargetExecutorsReplicaset( + expected: Int, + applicationId: String, + resourceProfileId: Int): Unit = { + if (setsCreated.contains(resourceProfileId)) { + val statefulset = kubernetesClient.apps().statefulSets().withName( + setName(applicationId, resourceProfileId: Int)) + statefulset.scale(expected, false /* wait */) + } else { + // We need to make the new replicaset which is going to involve building + // a pod. + val executorConf = KubernetesConf.createExecutorConf( + conf, + "EXECID",// template exec IDs + applicationId, + driverPod, + resourceProfileId) + val resolvedExecutorSpec = executorBuilder.buildFromFeatures(executorConf, secMgr, + kubernetesClient, rpIdToResourceProfile(resourceProfileId)) + val executorPod = resolvedExecutorSpec.pod + val podSpecBuilder = executorPod.pod.getSpec() match { + case null => new PodSpecBuilder() + case s => new PodSpecBuilder(s) + } + val podWithAttachedContainer: PodSpec = podSpecBuilder + .addToContainers(executorPod.container) + .build() + + val meta = executorPod.pod.getMetadata() + + // Resources that need to be created, volumes are per-pod which is all we care about here. + val resources = resolvedExecutorSpec.executorKubernetesResources + // We'll let PVCs be handled by the statefulset. Note user is responsible for + // cleaning up PVCs. Future work: integrate with KEP1847 once stabilized. + val dynamicVolumeClaims = resources.filter(_.getKind == "PersistentVolumeClaim") + .map(_.asInstanceOf[PersistentVolumeClaim]) + // Remove the dynamic volumes from our pod + val dynamicVolumeClaimNames: Set[String] = dynamicVolumeClaims.map(_.getMetadata().getName()) + .toSet + val podVolumes = podWithAttachedContainer.getVolumes().asScala + val staticVolumes = podVolumes.filter { v => + val pvc = v.getPersistentVolumeClaim() + pvc match { + case null => true + case _ => + !dynamicVolumeClaimNames.contains(pvc.getClaimName()) + } + } + val dynamicClaimToVolumeName = podVolumes.filter { v => + val pvc = v.getPersistentVolumeClaim() + pvc match { + case null => false + case _ => + dynamicVolumeClaimNames.contains(pvc.getClaimName()) + } + }.map { v => + (v.getPersistentVolumeClaim().getClaimName(), v.getName()) + }.toMap + // This just mutates it. Java style API + podWithAttachedContainer.setVolumes(staticVolumes.asJava) + // Rewrite the dynamic volume names to not ref our fake EXECID. + val newNamedVolumes = dynamicVolumeClaims.zipWithIndex.map { case (v, i) => + new PersistentVolumeClaimBuilder(v) + .editMetadata() + .withName(dynamicClaimToVolumeName.get(v.getMetadata().getName()).get) + .endMetadata() + .build() + } + + // Create a pod template spec from the pod. + val podTemplateSpec = new PodTemplateSpec(meta, podWithAttachedContainer) + + val statefulSet = new io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder() + .withNewMetadata() + .withName(setName(applicationId, resourceProfileId)) + .withNamespace(conf.get(KUBERNETES_NAMESPACE)) + .endMetadata() + .withNewSpec() + .withPodManagementPolicy("Parallel") + .withReplicas(expected) + .withNewSelector() + .addToMatchLabels(SPARK_APP_ID_LABEL, applicationId) + .addToMatchLabels(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE) + .addToMatchLabels(SPARK_RESOURCE_PROFILE_ID_LABEL, resourceProfileId.toString) + .endSelector() + .withTemplate(podTemplateSpec) + .addAllToVolumeClaimTemplates(newNamedVolumes.asJava) + .endSpec() + .build() + + addOwnerReference(driverPod.get, Seq(statefulSet)) + kubernetesClient.apps().statefulSets().create(statefulSet) + setsCreated += (resourceProfileId) + } + } + + override def stop(applicationId: String): Unit = { + // Cleanup the statefulsets when we stop + setsCreated.foreach { rpid => + Utils.tryLogNonFatalError { + kubernetesClient.apps().statefulSets().withName(setName(applicationId, rpid)).delete() + } + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index 363029e..e59772d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -455,9 +455,11 @@ class BasicExecutorFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { ENV_EXECUTOR_MEMORY -> "1024m", ENV_APPLICATION_ID -> KubernetesTestConf.APP_ID, ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, - ENV_EXECUTOR_POD_IP -> null, ENV_SPARK_USER -> Utils.getCurrentUserName(), - ENV_RESOURCE_PROFILE_ID -> "0") + ENV_RESOURCE_PROFILE_ID -> "0", + // These are populated by K8s on scheduling + ENV_EXECUTOR_POD_IP -> null, + ENV_EXECUTOR_POD_NAME -> null) val extraJavaOptsStart = additionalEnvVars.keys.count(_.startsWith(ENV_JAVA_OPT_PREFIX)) val extraJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 5b33da6..d263bd0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -101,6 +101,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private var podsAllocatorUnderTest: ExecutorPodsAllocator = _ + val appId = "testapp" + before { MockitoAnnotations.openMocks(this).close() when(kubernetesClient.pods()).thenReturn(podOperations) @@ -209,7 +211,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { val counter = PrivateMethod[AtomicInteger](Symbol("EXECUTOR_ID_COUNTER"))() assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) - podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1))) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> (podAllocationSize + 1))) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) @@ -230,7 +233,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { test("When a current batch reaches error states immediately, re-request" + " them on the next batch.") { - podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> podAllocationSize)) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> podAllocationSize)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5) for (execId <- 1 until podAllocationSize) { snapshotsStore.updatePod(runningExecutor(execId)) @@ -242,6 +246,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1)) } + test("Verify stopping deletes the labeled pods") { + when(podOperations + .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)) + .thenReturn(podOperations) + when(podOperations + .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)) + .thenReturn(labeledPods) + podsAllocatorUnderTest.stop(TEST_SPARK_APP_ID) + verify(labeledPods).delete() + } + test("When an executor is requested but the API does not report it in a reasonable time, retry" + " requesting that executor.") { when(podOperations @@ -253,7 +268,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(podOperations .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")) .thenReturn(labeledPods) - podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> 1)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(1)) waitForExecutorPodsClock.setTime(podCreationTimeout + 1) @@ -281,7 +297,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { waitForExecutorPodsClock.setTime(startTime) // Target 1 executor, make sure it's requested, even with an empty initial snapshot. - podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> 1)) assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1) verify(podOperations).create(podWithAttachedContainerForId(1)) @@ -293,7 +310,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { verify(podOperations, never()).delete() // Request 3 more executors, make sure all are requested. - podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4)) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> 4)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3) verify(podOperations).create(podWithAttachedContainerForId(2)) @@ -310,7 +328,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { // Scale down to 1. Pending executors (both acknowledged and not) should be deleted. waitForExecutorPodsClock.advance(executorIdleTimeout * 2) - podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> 1)) snapshotsStore.notifySubscribers() assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) verify(podOperations, times(4)).create(any()) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala new file mode 100644 index 0000000..ae1477e --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManagerSuite.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.client.KubernetesClient +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfter + +import org.apache.spark._ +import org.apache.spark.deploy.k8s.Config._ + +class KubernetesClusterManagerSuite extends SparkFunSuite with BeforeAndAfter { + + @Mock + private var kubernetesClient: KubernetesClient = _ + + @Mock + private var sc: SparkContext = _ + + @Mock + private var env: SparkEnv = _ + + @Mock + private var sparkConf: SparkConf = _ + + before { + MockitoAnnotations.openMocks(this).close() + when(sc.conf).thenReturn(sparkConf) + when(sc.conf.get(KUBERNETES_DRIVER_POD_NAME)).thenReturn(None) + when(sc.env).thenReturn(env) + } + + test("constructing a AbstractPodsAllocator works") { + val validConfigs = List("statefulset", "direct", + "org.apache.spark.scheduler.cluster.k8s.StatefulsetPodsAllocator", + "org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator") + validConfigs.foreach { c => + val manager = new KubernetesClusterManager() + when(sc.conf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR)).thenReturn(c) + manager.makeExecutorPodsAllocator(sc, kubernetesClient, null) + } + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala index bf17aa3..537685d 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager} -import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} +import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{ExecutorKilled, LiveListenerBus, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.{RegisterExecutor, RemoveExecutor, StopDriver} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend @@ -93,6 +93,9 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn @Mock private var pollEvents: ExecutorPodsPollingSnapshotSource = _ + @Mock + private var context: RpcCallContext = _ + private var driverEndpoint: ArgumentCaptor[RpcEndpoint] = _ private var schedulerBackendUnderTest: KubernetesClusterSchedulerBackend = _ @@ -149,7 +152,7 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn verify(eventQueue).stop() verify(watchEvents).stop() verify(pollEvents).stop() - verify(labeledPods).delete() + verify(podAllocator).stop(TEST_SPARK_APP_ID) verify(labledConfigMaps).delete() verify(kubernetesClient).close() } @@ -250,4 +253,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn endpoint.receiveAndReply(null).apply( RegisterExecutor("1", null, "host1", 1, Map.empty, Map.empty, Map.empty, 0)) } + + test("Dynamically fetch an executor ID") { + val endpoint = schedulerBackendUnderTest.createDriverEndpoint() + endpoint.receiveAndReply(context).apply(GenerateExecID("cheeseBurger")) + verify(context).reply("1") + } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetAllocatorSuite.scala new file mode 100644 index 0000000..5f8ceb2 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulsetAllocatorSuite.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.scheduler.cluster.k8s + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.api.model.apps.StatefulSet +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl._ +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.ArgumentMatchers.{any, eq => meq} +import org.mockito.Mockito.{never, times, verify, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesExecutorSpec} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.Fabric8Aliases._ +import org.apache.spark.internal.config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT +import org.apache.spark.resource._ +import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._ + +class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter { + + private val driverPodName = "driver" + + private val driverPod = new PodBuilder() + .withNewMetadata() + .withName(driverPodName) + .addToLabels(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID) + .addToLabels(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE) + .withUid("driver-pod-uid") + .endMetadata() + .build() + + private val conf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s") + + + private val defaultProfile: ResourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf) + private val secondProfile: ResourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf) + + private val secMgr = new SecurityManager(conf) + + + @Mock + private var kubernetesClient: KubernetesClient = _ + + @Mock + private var appOperations: AppsAPIGroupDSL = _ + + @Mock + private var statefulSetOperations: MixedOperation[ + apps.StatefulSet, apps.StatefulSetList, RollableScalableResource[apps.StatefulSet]] = _ + + @Mock + private var editableSet: RollableScalableResource[apps.StatefulSet] = _ + + @Mock + private var podOperations: PODS = _ + + + @Mock + private var driverPodOperations: PodResource[Pod] = _ + + private var podsAllocatorUnderTest: StatefulsetPodsAllocator = _ + + private var snapshotsStore: DeterministicExecutorPodsSnapshotsStore = _ + + @Mock + private var executorBuilder: KubernetesExecutorBuilder = _ + + @Mock + private var schedulerBackend: KubernetesClusterSchedulerBackend = _ + + val appId = "testapp" + + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = + (invocation: InvocationOnMock) => { + val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) + KubernetesExecutorSpec(executorPodWithId(0, + k8sConf.resourceProfileId.toInt), Seq.empty) + } + + before { + MockitoAnnotations.openMocks(this).close() + when(kubernetesClient.pods()).thenReturn(podOperations) + when(kubernetesClient.apps()).thenReturn(appOperations) + when(appOperations.statefulSets()).thenReturn(statefulSetOperations) + when(statefulSetOperations.withName(any())).thenReturn(editableSet) + when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) + when(driverPodOperations.get).thenReturn(driverPod) + when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod) + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), + meq(kubernetesClient), any(classOf[ResourceProfile]))).thenAnswer(executorPodAnswer()) + snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() + podsAllocatorUnderTest = new StatefulsetPodsAllocator( + conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, null) + when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + } + + test("Validate initial statefulSet creation & cleanup with two resource profiles") { + val rprof = new ResourceProfileBuilder() + val taskReq = new TaskResourceRequests().resource("gpu", 1) + val execReq = + new ExecutorResourceRequests().resource("gpu", 2, "myscript", "nvidia") + rprof.require(taskReq).require(execReq) + val immrprof = new ResourceProfile(rprof.executorResources, rprof.taskResources) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> (10), + immrprof -> (420))) + val captor = ArgumentCaptor.forClass(classOf[StatefulSet]) + verify(statefulSetOperations, times(2)).create(any()) + podsAllocatorUnderTest.stop(appId) + verify(editableSet, times(2)).delete() + } + + test("Validate statefulSet scale up") { + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> (10))) + val captor = ArgumentCaptor.forClass(classOf[StatefulSet]) + verify(statefulSetOperations, times(1)).create(captor.capture()) + val set = captor.getValue() + val setName = set.getMetadata().getName() + val namespace = set.getMetadata().getNamespace() + assert(namespace === "default") + val spec = set.getSpec() + assert(spec.getReplicas() === 10) + assert(spec.getPodManagementPolicy() === "Parallel") + verify(podOperations, never()).create(any()) + podsAllocatorUnderTest.setTotalExpectedExecutors( + Map(defaultProfile -> (20))) + verify(editableSet, times(1)).scale(any(), any()) + } +} diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh index f722471..c233bd8 100755 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/spark/entrypoint.sh @@ -85,13 +85,14 @@ case "$1" in -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH" - org.apache.spark.executor.CoarseGrainedExecutorBackend + org.apache.spark.scheduler.cluster.k8s.KubernetesExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --resourceProfileId $SPARK_RESOURCE_PROFILE_ID + --podName $SPARK_EXECUTOR_POD_NAME ) ;; diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala index 1c12123..6db4bee 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/BasicTestsSuite.scala @@ -16,7 +16,11 @@ */ package org.apache.spark.deploy.k8s.integrationtest +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.Pod +import org.scalatest.concurrent.Eventually +import org.scalatest.matchers.should.Matchers._ import org.apache.spark.TestUtils import org.apache.spark.launcher.SparkLauncher @@ -25,11 +29,24 @@ private[spark] trait BasicTestsSuite { k8sSuite: KubernetesSuite => import BasicTestsSuite._ import KubernetesSuite.k8sTestTag + import KubernetesSuite.{TIMEOUT, INTERVAL} test("Run SparkPi with no resources", k8sTestTag) { runSparkPiAndVerifyCompletion() } + test("Run SparkPi with no resources & statefulset allocation", k8sTestTag) { + sparkAppConf.set("spark.kubernetes.allocation.pods.allocator", "statefulset") + runSparkPiAndVerifyCompletion() + // Verify there is no dangling statefulset + // This depends on the garbage collection happening inside of K8s so give it some time. + Eventually.eventually(TIMEOUT, INTERVAL) { + val sets = kubernetesTestComponents.kubernetesClient.apps().statefulSets().list().getItems + val scalaSets = sets.asScala + scalaSets.size shouldBe (0) + } + } + test("Run SparkPi with a very long application name.", k8sTestTag) { sparkAppConf.set("spark.app.name", "long" * 40) runSparkPiAndVerifyCompletion() diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index d65f594..06080ab 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -100,11 +100,17 @@ class KubernetesSuite extends SparkFunSuite .withLabel("spark-role", "executor") .list() .getItems.asScala.foreach { execPod => - logInfo(s"\nBEGIN executor (${execPod.getMetadata.getName}) POD log:\n" + + val podLog = try { kubernetesTestComponents.kubernetesClient .pods() .withName(execPod.getMetadata.getName) - .getLog) + .getLog + } catch { + case e: io.fabric8.kubernetes.client.KubernetesClientException => + "Error fetching log (pod is likely not ready) ${e}" + } + logInfo(s"\nBEGIN executor (${execPod.getMetadata.getName}) POD log:\n" + + podLog) logInfo(s"END executor (${execPod.getMetadata.getName}) POD log") } } @@ -237,6 +243,28 @@ class KubernetesSuite extends SparkFunSuite Option((interval, None))) } + protected def runMiniReadWriteAndVerifyCompletion( + wordCount: Int, + appResource: String = containerLocalSparkDistroExamplesJar, + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, + appArgs: Array[String] = Array.empty[String], + isJVM: Boolean = true, + interval: Option[PatienceConfiguration.Interval] = None): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, + SPARK_MINI_READ_WRITE_TEST, + Seq(s"Success! Local Word Count $wordCount and " + + s"D Word Count $wordCount agree."), + Seq(), + appArgs, + driverPodChecker, + executorPodChecker, + isJVM, + None, + Option((interval, None))) + } + protected def runSparkRemoteCheckAndVerifyCompletion( appResource: String = containerLocalSparkDistroExamplesJar, driverPodChecker: Pod => Unit = doBasicDriverPodCheck, @@ -570,6 +598,7 @@ private[spark] object KubernetesSuite { val MinikubeTag = Tag("minikube") val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest" + val SPARK_MINI_READ_WRITE_TEST = "org.apache.spark.examples.MiniReadWriteTest" val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest" val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest" val TIMEOUT = PatienceConfiguration.Timeout(Span(3, Minutes)) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala index 86f8cdd..0362b34 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => import PVTestsSuite._ - private def setupLocalStorage(): Unit = { + private def setupLocalStorageClass(): Unit = { val scBuilder = new StorageClassBuilder() .withKind("StorageClass") .withApiVersion("storage.k8s.io/v1") @@ -37,6 +37,21 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => .endMetadata() .withProvisioner("kubernetes.io/no-provisioner") .withVolumeBindingMode("WaitForFirstConsumer") + try { + kubernetesTestComponents + .kubernetesClient + .storage() + .storageClasses() + .create(scBuilder.build()) + } catch { + case e: io.fabric8.kubernetes.client.KubernetesClientException => + // Ignore storage class error sometimes we have a dangling class + } + } + + private def setupLocalStorage(): Unit = { + + setupLocalStorageClass() val pvBuilder = new PersistentVolumeBuilder() .withKind("PersistentVolume") @@ -77,12 +92,6 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => kubernetesTestComponents .kubernetesClient - .storage() - .storageClasses() - .create(scBuilder.build()) - - kubernetesTestComponents - .kubernetesClient .persistentVolumes() .create(pvBuilder.build()) @@ -122,6 +131,71 @@ private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite => } } + test("PVs with local hostpath storage on statefulsets", k8sTestTag, MinikubeTag) { + sparkAppConf + .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path", + CONTAINER_MOUNT_PATH) + .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName", + PVC_NAME) + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path", + CONTAINER_MOUNT_PATH) + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName", + PVC_NAME) + .set("spark.kubernetes.allocation.pods.allocator", "statefulset") + val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) + try { + setupLocalStorage() + runMiniReadWriteAndVerifyCompletion( + FILE_CONTENTS.split(" ").length, + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + }, + appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file"), + interval = Some(PV_TESTS_INTERVAL) + ) + } finally { + // make sure this always runs + deleteLocalStorage() + } + } + + test("PVs with local hostpath and storageClass on statefulsets", k8sTestTag, MinikubeTag) { + sparkAppConf + .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path", + CONTAINER_MOUNT_PATH) + .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName", + PVC_NAME) + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path", + CONTAINER_MOUNT_PATH) + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName", + PVC_NAME + "OnDemand") + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.storageClass", + "standard") + .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.sizeLimit", "1G") + .set("spark.kubernetes.allocation.pods.allocator", "statefulset") + val file = Utils.createTempFile(FILE_CONTENTS, HOST_PATH) + try { + setupLocalStorage() + runMiniReadWriteAndVerifyCompletion( + FILE_CONTENTS.split(" ").length, + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + }, + appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file"), + interval = Some(PV_TESTS_INTERVAL) + ) + } finally { + // make sure this always runs + deleteLocalStorage() + } + } + test("PVs with local storage", k8sTestTag, MinikubeTag) { sparkAppConf .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org