Repository: spark Updated Branches: refs/heads/master 32ec269d0 -> 171f6ddad
http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala new file mode 100644 index 0000000..c0e7bb2 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStep.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps.initcontainer + +import org.apache.spark.deploy.k8s.MountSecretsBootstrap + +/** + * An init-container configuration step for mounting user-specified secrets onto user-specified + * paths. + * + * @param bootstrap a utility actually handling mounting of the secrets + */ +private[spark] class InitContainerMountSecretsStep( + bootstrap: MountSecretsBootstrap) extends InitContainerConfigurationStep { + + override def configureInitContainer(spec: InitContainerSpec) : InitContainerSpec = { + val (driverPod, initContainer) = bootstrap.mountSecrets( + spec.driverPod, + spec.initContainer) + spec.copy( + driverPod = driverPod, + initContainer = initContainer + ) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala new file mode 100644 index 0000000..b52c343 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerSpec.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps.initcontainer + +import io.fabric8.kubernetes.api.model.{Container, HasMetadata, Pod} + +/** + * Represents a specification of the init-container for the driver pod. + * + * @param properties properties that should be set on the init-container + * @param driverSparkConf Spark configuration properties that will be carried back to the driver + * @param initContainer the init-container object + * @param driverContainer the driver container object + * @param driverPod the driver pod object + * @param dependentResources resources the init-container depends on to work + */ +private[spark] case class InitContainerSpec( + properties: Map[String, String], + driverSparkConf: Map[String, String], + initContainer: Container, + driverContainer: Container, + driverPod: Pod, + dependentResources: Seq[HasMetadata]) http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala new file mode 100644 index 0000000..4a4b628 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainer.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.k8s + +import java.io.File +import java.util.concurrent.TimeUnit + +import scala.concurrent.{ExecutionContext, Future} + +import org.apache.spark.{SecurityManager => SparkSecurityManager, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.internal.Logging +import org.apache.spark.util.{ThreadUtils, Utils} + +/** + * Process that fetches files from a resource staging server and/or arbitrary remote locations. + * + * The init-container can handle fetching files from any of those sources, but not all of the + * sources need to be specified. This allows for composing multiple instances of this container + * with different configurations for different download sources, or using the same container to + * download everything at once. + */ +private[spark] class SparkPodInitContainer( + sparkConf: SparkConf, + fileFetcher: FileFetcher) extends Logging { + + private val maxThreadPoolSize = sparkConf.get(INIT_CONTAINER_MAX_THREAD_POOL_SIZE) + private implicit val downloadExecutor = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("download-executor", maxThreadPoolSize)) + + private val jarsDownloadDir = new File(sparkConf.get(JARS_DOWNLOAD_LOCATION)) + private val filesDownloadDir = new File(sparkConf.get(FILES_DOWNLOAD_LOCATION)) + + private val remoteJars = sparkConf.get(INIT_CONTAINER_REMOTE_JARS) + private val remoteFiles = sparkConf.get(INIT_CONTAINER_REMOTE_FILES) + + private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) + + def run(): Unit = { + logInfo(s"Downloading remote jars: $remoteJars") + downloadFiles( + remoteJars, + jarsDownloadDir, + s"Remote jars download directory specified at $jarsDownloadDir does not exist " + + "or is not a directory.") + + logInfo(s"Downloading remote files: $remoteFiles") + downloadFiles( + remoteFiles, + filesDownloadDir, + s"Remote files download directory specified at $filesDownloadDir does not exist " + + "or is not a directory.") + + downloadExecutor.shutdown() + downloadExecutor.awaitTermination(downloadTimeoutMinutes, TimeUnit.MINUTES) + } + + private def downloadFiles( + filesCommaSeparated: Option[String], + downloadDir: File, + errMessage: String): Unit = { + filesCommaSeparated.foreach { files => + require(downloadDir.isDirectory, errMessage) + Utils.stringToSeq(files).foreach { file => + Future[Unit] { + fileFetcher.fetchFile(file, downloadDir) + } + } + } + } +} + +private class FileFetcher(sparkConf: SparkConf, securityManager: SparkSecurityManager) { + + def fetchFile(uri: String, targetDir: File): Unit = { + Utils.fetchFile( + url = uri, + targetDir = targetDir, + conf = sparkConf, + securityMgr = securityManager, + hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf), + timestamp = System.currentTimeMillis(), + useCache = false) + } +} + +object SparkPodInitContainer extends Logging { + + def main(args: Array[String]): Unit = { + logInfo("Starting init-container to download Spark application dependencies.") + val sparkConf = new SparkConf(true) + if (args.nonEmpty) { + Utils.loadDefaultSparkProperties(sparkConf, args(0)) + } + + val securityManager = new SparkSecurityManager(sparkConf) + val fileFetcher = new FileFetcher(sparkConf, securityManager) + new SparkPodInitContainer(sparkConf, fileFetcher).run() + logInfo("Finished downloading application dependencies.") + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala index 7022615..ba5d891 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactory.scala @@ -21,35 +21,35 @@ import scala.collection.JavaConverters._ import io.fabric8.kubernetes.api.model._ import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, PodWithDetachedInitContainer} import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.ConfigurationUtils import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.config.{EXECUTOR_CLASS_PATH, EXECUTOR_JAVA_OPTIONS, EXECUTOR_MEMORY, EXECUTOR_MEMORY_OVERHEAD} import org.apache.spark.util.Utils /** - * A factory class for configuring and creating executor pods. + * A factory class for bootstrapping and creating executor pods with the given bootstrapping + * components. + * + * @param sparkConf Spark configuration + * @param mountSecretsBootstrap an optional component for mounting user-specified secrets onto + * user-specified paths into the executor container + * @param initContainerBootstrap an optional component for bootstrapping the executor init-container + * if one is needed, i.e., when there are remote dependencies to + * localize + * @param initContainerMountSecretsBootstrap an optional component for mounting user-specified + * secrets onto user-specified paths into the executor + * init-container */ -private[spark] trait ExecutorPodFactory { - - /** - * Configure and construct an executor pod with the given parameters. - */ - def createExecutorPod( - executorId: String, - applicationId: String, - driverUrl: String, - executorEnvs: Seq[(String, String)], - driverPod: Pod, - nodeToLocalTaskCount: Map[String, Int]): Pod -} - -private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) - extends ExecutorPodFactory { +private[spark] class ExecutorPodFactory( + sparkConf: SparkConf, + mountSecretsBootstrap: Option[MountSecretsBootstrap], + initContainerBootstrap: Option[InitContainerBootstrap], + initContainerMountSecretsBootstrap: Option[MountSecretsBootstrap]) { private val executorExtraClasspath = sparkConf.get(EXECUTOR_CLASS_PATH) - private val executorLabels = ConfigurationUtils.parsePrefixedKeyValuePairs( + private val executorLabels = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) require( @@ -64,11 +64,11 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") private val executorAnnotations = - ConfigurationUtils.parsePrefixedKeyValuePairs( + KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) private val nodeSelector = - ConfigurationUtils.parsePrefixedKeyValuePairs( + KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) @@ -94,7 +94,10 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) private val executorCores = sparkConf.getDouble("spark.executor.cores", 1) private val executorLimitCores = sparkConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) - override def createExecutorPod( + /** + * Configure and construct an executor pod with the given parameters. + */ + def createExecutorPod( executorId: String, applicationId: String, driverUrl: String, @@ -198,7 +201,7 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) .endSpec() .build() - val containerWithExecutorLimitCores = executorLimitCores.map { limitCores => + val containerWithLimitCores = executorLimitCores.map { limitCores => val executorCpuLimitQuantity = new QuantityBuilder(false) .withAmount(limitCores) .build() @@ -209,9 +212,33 @@ private[spark] class ExecutorPodFactoryImpl(sparkConf: SparkConf) .build() }.getOrElse(executorContainer) - new PodBuilder(executorPod) + val (maybeSecretsMountedPod, maybeSecretsMountedContainer) = + mountSecretsBootstrap.map { bootstrap => + bootstrap.mountSecrets(executorPod, containerWithLimitCores) + }.getOrElse((executorPod, containerWithLimitCores)) + + val (bootstrappedPod, bootstrappedContainer) = + initContainerBootstrap.map { bootstrap => + val podWithInitContainer = bootstrap.bootstrapInitContainer( + PodWithDetachedInitContainer( + maybeSecretsMountedPod, + new ContainerBuilder().build(), + maybeSecretsMountedContainer)) + + val (pod, mayBeSecretsMountedInitContainer) = + initContainerMountSecretsBootstrap.map { bootstrap => + bootstrap.mountSecrets(podWithInitContainer.pod, podWithInitContainer.initContainer) + }.getOrElse((podWithInitContainer.pod, podWithInitContainer.initContainer)) + + val bootstrappedPod = KubernetesUtils.appendInitContainer( + pod, mayBeSecretsMountedInitContainer) + + (bootstrappedPod, podWithInitContainer.mainContainer) + }.getOrElse((maybeSecretsMountedPod, maybeSecretsMountedContainer)) + + new PodBuilder(bootstrappedPod) .editSpec() - .addToContainers(containerWithExecutorLimitCores) + .addToContainers(bootstrappedContainer) .endSpec() .build() } http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala index b8bb152..a942db6 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala @@ -21,9 +21,9 @@ import java.io.File import io.fabric8.kubernetes.client.Config import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.deploy.k8s.{InitContainerBootstrap, KubernetesUtils, MountSecretsBootstrap, SparkKubernetesClientFactory} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.util.ThreadUtils @@ -45,6 +45,59 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { val sparkConf = sc.getConf + val initContainerConfigMap = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_NAME) + val initContainerConfigMapKey = sparkConf.get(INIT_CONTAINER_CONFIG_MAP_KEY_CONF) + + if (initContainerConfigMap.isEmpty) { + logWarning("The executor's init-container config map is not specified. Executors will " + + "therefore not attempt to fetch remote or submitted dependencies.") + } + + if (initContainerConfigMapKey.isEmpty) { + logWarning("The executor's init-container config map key is not specified. Executors will " + + "therefore not attempt to fetch remote or submitted dependencies.") + } + + // Only set up the bootstrap if they've provided both the config map key and the config map + // name. The config map might not be provided if init-containers aren't being used to + // bootstrap dependencies. + val initContainerBootstrap = for { + configMap <- initContainerConfigMap + configMapKey <- initContainerConfigMapKey + } yield { + val initContainerImage = sparkConf + .get(INIT_CONTAINER_IMAGE) + .getOrElse(throw new SparkException( + "Must specify the init-container image when there are remote dependencies")) + new InitContainerBootstrap( + initContainerImage, + sparkConf.get(CONTAINER_IMAGE_PULL_POLICY), + sparkConf.get(JARS_DOWNLOAD_LOCATION), + sparkConf.get(FILES_DOWNLOAD_LOCATION), + configMap, + configMapKey, + SPARK_POD_EXECUTOR_ROLE, + sparkConf) + } + + val executorSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs( + sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) + val mountSecretBootstrap = if (executorSecretNamesToMountPaths.nonEmpty) { + Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths)) + } else { + None + } + // Mount user-specified executor secrets also into the executor's init-container. The + // init-container may need credentials in the secrets to be able to download remote + // dependencies. The executor's main container and its init-container share the secrets + // because the init-container is sort of an implementation details and this sharing + // avoids introducing a dedicated configuration property just for the init-container. + val initContainerMountSecretsBootstrap = if (initContainerBootstrap.nonEmpty && + executorSecretNamesToMountPaths.nonEmpty) { + Some(new MountSecretsBootstrap(executorSecretNamesToMountPaths)) + } else { + None + } val kubernetesClient = SparkKubernetesClientFactory.createKubernetesClient( KUBERNETES_MASTER_INTERNAL_URL, @@ -54,7 +107,12 @@ private[spark] class KubernetesClusterManager extends ExternalClusterManager wit Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) - val executorPodFactory = new ExecutorPodFactoryImpl(sparkConf) + val executorPodFactory = new ExecutorPodFactory( + sparkConf, + mountSecretBootstrap, + initContainerBootstrap, + initContainerMountSecretsBootstrap) + val allocatorExecutor = ThreadUtils .newDaemonSingleThreadScheduledExecutor("kubernetes-pod-allocator") val requestExecutorsService = ThreadUtils.newDaemonCachedThreadPool( http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala new file mode 100644 index 0000000..f193b1f --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigOrchestratorSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.submit.steps._ + +class DriverConfigOrchestratorSuite extends SparkFunSuite { + + private val DRIVER_IMAGE = "driver-image" + private val IC_IMAGE = "init-container-image" + private val APP_ID = "spark-app-id" + private val LAUNCH_TIME = 975256L + private val APP_NAME = "spark" + private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" + private val APP_ARGS = Array("arg1", "arg2") + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/driver" + + test("Base submission steps with a main app resource.") { + val sparkConf = new SparkConf(false) + .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE) + val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") + val orchestrator = new DriverConfigOrchestrator( + APP_ID, + LAUNCH_TIME, + Some(mainAppResource), + APP_NAME, + MAIN_CLASS, + APP_ARGS, + sparkConf) + validateStepTypes( + orchestrator, + classOf[BasicDriverConfigurationStep], + classOf[DriverServiceBootstrapStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep] + ) + } + + test("Base submission steps without a main app resource.") { + val sparkConf = new SparkConf(false) + .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE) + val orchestrator = new DriverConfigOrchestrator( + APP_ID, + LAUNCH_TIME, + Option.empty, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + sparkConf) + validateStepTypes( + orchestrator, + classOf[BasicDriverConfigurationStep], + classOf[DriverServiceBootstrapStep], + classOf[DriverKubernetesCredentialsStep] + ) + } + + test("Submission steps with an init-container.") { + val sparkConf = new SparkConf(false) + .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE) + .set(INIT_CONTAINER_IMAGE, IC_IMAGE) + .set("spark.jars", "hdfs://localhost:9000/var/apps/jars/jar1.jar") + val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") + val orchestrator = new DriverConfigOrchestrator( + APP_ID, + LAUNCH_TIME, + Some(mainAppResource), + APP_NAME, + MAIN_CLASS, + APP_ARGS, + sparkConf) + validateStepTypes( + orchestrator, + classOf[BasicDriverConfigurationStep], + classOf[DriverServiceBootstrapStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[DriverInitContainerBootstrapStep]) + } + + test("Submission steps with driver secrets to mount") { + val sparkConf = new SparkConf(false) + .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH) + val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") + val orchestrator = new DriverConfigOrchestrator( + APP_ID, + LAUNCH_TIME, + Some(mainAppResource), + APP_NAME, + MAIN_CLASS, + APP_ARGS, + sparkConf) + validateStepTypes( + orchestrator, + classOf[BasicDriverConfigurationStep], + classOf[DriverServiceBootstrapStep], + classOf[DriverKubernetesCredentialsStep], + classOf[DependencyResolutionStep], + classOf[DriverMountSecretsStep]) + } + + private def validateStepTypes( + orchestrator: DriverConfigOrchestrator, + types: Class[_ <: DriverConfigurationStep]*): Unit = { + val steps = orchestrator.getAllConfigurationSteps + assert(steps.size === types.size) + assert(steps.map(_.getClass) === types) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala deleted file mode 100644 index 98f9f27..0000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestratorSuite.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.submit - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.Config.DRIVER_CONTAINER_IMAGE -import org.apache.spark.deploy.k8s.submit.steps._ - -class DriverConfigurationStepsOrchestratorSuite extends SparkFunSuite { - - private val NAMESPACE = "default" - private val DRIVER_IMAGE = "driver-image" - private val APP_ID = "spark-app-id" - private val LAUNCH_TIME = 975256L - private val APP_NAME = "spark" - private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" - private val APP_ARGS = Array("arg1", "arg2") - - test("Base submission steps with a main app resource.") { - val sparkConf = new SparkConf(false) - .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE) - val mainAppResource = JavaMainAppResource("local:///var/apps/jars/main.jar") - val orchestrator = new DriverConfigurationStepsOrchestrator( - NAMESPACE, - APP_ID, - LAUNCH_TIME, - Some(mainAppResource), - APP_NAME, - MAIN_CLASS, - APP_ARGS, - sparkConf) - validateStepTypes( - orchestrator, - classOf[BaseDriverConfigurationStep], - classOf[DriverServiceBootstrapStep], - classOf[DriverKubernetesCredentialsStep], - classOf[DependencyResolutionStep] - ) - } - - test("Base submission steps without a main app resource.") { - val sparkConf = new SparkConf(false) - .set(DRIVER_CONTAINER_IMAGE, DRIVER_IMAGE) - val orchestrator = new DriverConfigurationStepsOrchestrator( - NAMESPACE, - APP_ID, - LAUNCH_TIME, - Option.empty, - APP_NAME, - MAIN_CLASS, - APP_ARGS, - sparkConf) - validateStepTypes( - orchestrator, - classOf[BaseDriverConfigurationStep], - classOf[DriverServiceBootstrapStep], - classOf[DriverKubernetesCredentialsStep] - ) - } - - private def validateStepTypes( - orchestrator: DriverConfigurationStepsOrchestrator, - types: Class[_ <: DriverConfigurationStep]*): Unit = { - val steps = orchestrator.getAllConfigurationSteps() - assert(steps.size === types.size) - assert(steps.map(_.getClass) === types) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala new file mode 100644 index 0000000..8388c16 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/SecretVolumeUtils.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{Container, Pod} + +private[spark] object SecretVolumeUtils { + + def podHasVolume(driverPod: Pod, volumeName: String): Boolean = { + driverPod.getSpec.getVolumes.asScala.exists(volume => volume.getName == volumeName) + } + + def containerHasVolume( + driverContainer: Container, + volumeName: String, + mountPath: String): Boolean = { + driverContainer.getVolumeMounts.asScala.exists(volumeMount => + volumeMount.getName == volumeName && volumeMount.getMountPath == mountPath) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala deleted file mode 100644 index f7c1b31..0000000 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStepSuite.scala +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.deploy.k8s.submit.steps - -import scala.collection.JavaConverters._ - -import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder} - -import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec - -class BaseDriverConfigurationStepSuite extends SparkFunSuite { - - private val APP_ID = "spark-app-id" - private val RESOURCE_NAME_PREFIX = "spark" - private val DRIVER_LABELS = Map("labelkey" -> "labelvalue") - private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" - private val APP_NAME = "spark-test" - private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" - private val APP_ARGS = Array("arg1", "arg2", "arg 3") - private val CUSTOM_ANNOTATION_KEY = "customAnnotation" - private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" - private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1" - private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2" - - test("Set all possible configurations from the user.") { - val sparkConf = new SparkConf() - .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") - .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, "/opt/spark/spark-examples.jar") - .set("spark.driver.cores", "2") - .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") - .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M") - .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L) - .set(DRIVER_CONTAINER_IMAGE, "spark-driver:latest") - .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) - .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1") - .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2") - - val submissionStep = new BaseDriverConfigurationStep( - APP_ID, - RESOURCE_NAME_PREFIX, - DRIVER_LABELS, - CONTAINER_IMAGE_PULL_POLICY, - APP_NAME, - MAIN_CLASS, - APP_ARGS, - sparkConf) - val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build() - val baseDriverSpec = KubernetesDriverSpec( - driverPod = basePod, - driverContainer = new ContainerBuilder().build(), - driverSparkConf = new SparkConf(false), - otherKubernetesResources = Seq.empty[HasMetadata]) - val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec) - - assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME) - assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest") - assert(preparedDriverSpec.driverContainer.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY) - - assert(preparedDriverSpec.driverContainer.getEnv.size === 7) - val envs = preparedDriverSpec.driverContainer - .getEnv - .asScala - .map(env => (env.getName, env.getValue)) - .toMap - assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar") - assert(envs(ENV_DRIVER_MEMORY) === "256M") - assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS) - assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"") - assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1") - assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2") - - assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar => - envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) && - envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") && - envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP"))) - - val resourceRequirements = preparedDriverSpec.driverContainer.getResources - val requests = resourceRequirements.getRequests.asScala - assert(requests("cpu").getAmount === "2") - assert(requests("memory").getAmount === "256Mi") - val limits = resourceRequirements.getLimits.asScala - assert(limits("memory").getAmount === "456Mi") - assert(limits("cpu").getAmount === "4") - - val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata - assert(driverPodMetadata.getName === "spark-driver-pod") - assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS) - val expectedAnnotations = Map( - CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE, - SPARK_APP_NAME_ANNOTATION -> APP_NAME) - assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations) - assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never") - - val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap - val expectedSparkConf = Map( - KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", - "spark.app.id" -> APP_ID, - KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX) - assert(resolvedSparkConf === expectedSparkConf) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala new file mode 100644 index 0000000..e864c6a --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/BasicDriverConfigurationStepSuite.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +class BasicDriverConfigurationStepSuite extends SparkFunSuite { + + private val APP_ID = "spark-app-id" + private val RESOURCE_NAME_PREFIX = "spark" + private val DRIVER_LABELS = Map("labelkey" -> "labelvalue") + private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent" + private val APP_NAME = "spark-test" + private val MAIN_CLASS = "org.apache.spark.examples.SparkPi" + private val APP_ARGS = Array("arg1", "arg2", "arg 3") + private val CUSTOM_ANNOTATION_KEY = "customAnnotation" + private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" + private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1" + private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2" + + test("Set all possible configurations from the user.") { + val sparkConf = new SparkConf() + .set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod") + .set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, "/opt/spark/spark-examples.jar") + .set("spark.driver.cores", "2") + .set(KUBERNETES_DRIVER_LIMIT_CORES, "4") + .set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M") + .set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L) + .set(DRIVER_CONTAINER_IMAGE, "spark-driver:latest") + .set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE) + .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1") + .set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2") + + val submissionStep = new BasicDriverConfigurationStep( + APP_ID, + RESOURCE_NAME_PREFIX, + DRIVER_LABELS, + CONTAINER_IMAGE_PULL_POLICY, + APP_NAME, + MAIN_CLASS, + APP_ARGS, + sparkConf) + val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build() + val baseDriverSpec = KubernetesDriverSpec( + driverPod = basePod, + driverContainer = new ContainerBuilder().build(), + driverSparkConf = new SparkConf(false), + otherKubernetesResources = Seq.empty[HasMetadata]) + val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec) + + assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME) + assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest") + assert(preparedDriverSpec.driverContainer.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY) + + assert(preparedDriverSpec.driverContainer.getEnv.size === 7) + val envs = preparedDriverSpec.driverContainer + .getEnv + .asScala + .map(env => (env.getName, env.getValue)) + .toMap + assert(envs(ENV_SUBMIT_EXTRA_CLASSPATH) === "/opt/spark/spark-examples.jar") + assert(envs(ENV_DRIVER_MEMORY) === "256M") + assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS) + assert(envs(ENV_DRIVER_ARGS) === "\"arg1\" \"arg2\" \"arg 3\"") + assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1") + assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2") + + assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar => + envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) && + envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") && + envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP"))) + + val resourceRequirements = preparedDriverSpec.driverContainer.getResources + val requests = resourceRequirements.getRequests.asScala + assert(requests("cpu").getAmount === "2") + assert(requests("memory").getAmount === "256Mi") + val limits = resourceRequirements.getLimits.asScala + assert(limits("memory").getAmount === "456Mi") + assert(limits("cpu").getAmount === "4") + + val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata + assert(driverPodMetadata.getName === "spark-driver-pod") + assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS) + val expectedAnnotations = Map( + CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE, + SPARK_APP_NAME_ANNOTATION -> APP_NAME) + assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations) + assert(preparedDriverSpec.driverPod.getSpec.getRestartPolicy === "Never") + + val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap + val expectedSparkConf = Map( + KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", + "spark.app.id" -> APP_ID, + KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX) + assert(resolvedSparkConf === expectedSparkConf) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala new file mode 100644 index 0000000..758871e --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverInitContainerBootstrapStepSuite.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.StringReader +import java.util.Properties + +import scala.collection.JavaConverters._ + +import com.google.common.collect.Maps +import io.fabric8.kubernetes.api.model.{ConfigMap, ContainerBuilder, HasMetadata, PodBuilder, SecretBuilder} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import org.apache.spark.deploy.k8s.submit.steps.initcontainer.{InitContainerConfigurationStep, InitContainerSpec} +import org.apache.spark.util.Utils + +class DriverInitContainerBootstrapStepSuite extends SparkFunSuite { + + private val CONFIG_MAP_NAME = "spark-init-config-map" + private val CONFIG_MAP_KEY = "spark-init-config-map-key" + + test("The init container bootstrap step should use all of the init container steps") { + val baseDriverSpec = KubernetesDriverSpec( + driverPod = new PodBuilder().build(), + driverContainer = new ContainerBuilder().build(), + driverSparkConf = new SparkConf(false), + otherKubernetesResources = Seq.empty[HasMetadata]) + val initContainerSteps = Seq( + FirstTestInitContainerConfigurationStep, + SecondTestInitContainerConfigurationStep) + val bootstrapStep = new DriverInitContainerBootstrapStep( + initContainerSteps, + CONFIG_MAP_NAME, + CONFIG_MAP_KEY) + + val preparedDriverSpec = bootstrapStep.configureDriver(baseDriverSpec) + + assert(preparedDriverSpec.driverPod.getMetadata.getLabels.asScala === + FirstTestInitContainerConfigurationStep.additionalLabels) + val additionalDriverEnv = preparedDriverSpec.driverContainer.getEnv.asScala + assert(additionalDriverEnv.size === 1) + assert(additionalDriverEnv.head.getName === + FirstTestInitContainerConfigurationStep.additionalMainContainerEnvKey) + assert(additionalDriverEnv.head.getValue === + FirstTestInitContainerConfigurationStep.additionalMainContainerEnvValue) + + assert(preparedDriverSpec.otherKubernetesResources.size === 2) + assert(preparedDriverSpec.otherKubernetesResources.contains( + FirstTestInitContainerConfigurationStep.additionalKubernetesResource)) + assert(preparedDriverSpec.otherKubernetesResources.exists { + case configMap: ConfigMap => + val hasMatchingName = configMap.getMetadata.getName == CONFIG_MAP_NAME + val configMapData = configMap.getData.asScala + val hasCorrectNumberOfEntries = configMapData.size == 1 + val initContainerPropertiesRaw = configMapData(CONFIG_MAP_KEY) + val initContainerProperties = new Properties() + Utils.tryWithResource(new StringReader(initContainerPropertiesRaw)) { + initContainerProperties.load(_) + } + val initContainerPropertiesMap = Maps.fromProperties(initContainerProperties).asScala + val expectedInitContainerProperties = Map( + SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyKey -> + SecondTestInitContainerConfigurationStep.additionalInitContainerPropertyValue) + val hasMatchingProperties = initContainerPropertiesMap == expectedInitContainerProperties + hasMatchingName && hasCorrectNumberOfEntries && hasMatchingProperties + + case _ => false + }) + + val initContainers = preparedDriverSpec.driverPod.getSpec.getInitContainers + assert(initContainers.size() === 1) + val initContainerEnv = initContainers.get(0).getEnv.asScala + assert(initContainerEnv.size === 1) + assert(initContainerEnv.head.getName === + SecondTestInitContainerConfigurationStep.additionalInitContainerEnvKey) + assert(initContainerEnv.head.getValue === + SecondTestInitContainerConfigurationStep.additionalInitContainerEnvValue) + + val expectedSparkConf = Map( + INIT_CONTAINER_CONFIG_MAP_NAME.key -> CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY_CONF.key -> CONFIG_MAP_KEY, + SecondTestInitContainerConfigurationStep.additionalDriverSparkConfKey -> + SecondTestInitContainerConfigurationStep.additionalDriverSparkConfValue) + assert(preparedDriverSpec.driverSparkConf.getAll.toMap === expectedSparkConf) + } +} + +private object FirstTestInitContainerConfigurationStep extends InitContainerConfigurationStep { + + val additionalLabels = Map("additionalLabelkey" -> "additionalLabelValue") + val additionalMainContainerEnvKey = "TEST_ENV_MAIN_KEY" + val additionalMainContainerEnvValue = "TEST_ENV_MAIN_VALUE" + val additionalKubernetesResource = new SecretBuilder() + .withNewMetadata() + .withName("test-secret") + .endMetadata() + .addToData("secret-key", "secret-value") + .build() + + override def configureInitContainer(initContainerSpec: InitContainerSpec): InitContainerSpec = { + val driverPod = new PodBuilder(initContainerSpec.driverPod) + .editOrNewMetadata() + .addToLabels(additionalLabels.asJava) + .endMetadata() + .build() + val mainContainer = new ContainerBuilder(initContainerSpec.driverContainer) + .addNewEnv() + .withName(additionalMainContainerEnvKey) + .withValue(additionalMainContainerEnvValue) + .endEnv() + .build() + initContainerSpec.copy( + driverPod = driverPod, + driverContainer = mainContainer, + dependentResources = initContainerSpec.dependentResources ++ + Seq(additionalKubernetesResource)) + } +} + +private object SecondTestInitContainerConfigurationStep extends InitContainerConfigurationStep { + val additionalInitContainerEnvKey = "TEST_ENV_INIT_KEY" + val additionalInitContainerEnvValue = "TEST_ENV_INIT_VALUE" + val additionalInitContainerPropertyKey = "spark.initcontainer.testkey" + val additionalInitContainerPropertyValue = "testvalue" + val additionalDriverSparkConfKey = "spark.driver.testkey" + val additionalDriverSparkConfValue = "spark.driver.testvalue" + + override def configureInitContainer(initContainerSpec: InitContainerSpec): InitContainerSpec = { + val initContainer = new ContainerBuilder(initContainerSpec.initContainer) + .addNewEnv() + .withName(additionalInitContainerEnvKey) + .withValue(additionalInitContainerEnvValue) + .endEnv() + .build() + val initContainerProperties = initContainerSpec.properties ++ + Map(additionalInitContainerPropertyKey -> additionalInitContainerPropertyValue) + val driverSparkConf = initContainerSpec.driverSparkConf ++ + Map(additionalDriverSparkConfKey -> additionalDriverSparkConfValue) + initContainerSpec.copy( + initContainer = initContainer, + properties = initContainerProperties, + driverSparkConf = driverSparkConf) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala new file mode 100644 index 0000000..9ec0cb5 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverMountSecretsStepSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.MountSecretsBootstrap +import org.apache.spark.deploy.k8s.submit.{KubernetesDriverSpec, SecretVolumeUtils} + +class DriverMountSecretsStepSuite extends SparkFunSuite { + + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/driver" + + test("mounts all given secrets") { + val baseDriverSpec = KubernetesDriverSpec.initialSpec(new SparkConf(false)) + val secretNamesToMountPaths = Map( + SECRET_FOO -> SECRET_MOUNT_PATH, + SECRET_BAR -> SECRET_MOUNT_PATH) + + val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths) + val mountSecretsStep = new DriverMountSecretsStep(mountSecretsBootstrap) + val configuredDriverSpec = mountSecretsStep.configureDriver(baseDriverSpec) + val driverPodWithSecretsMounted = configuredDriverSpec.driverPod + val driverContainerWithSecretsMounted = configuredDriverSpec.driverContainer + + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName => + assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)) + } + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName => + assert(SecretVolumeUtils.containerHasVolume( + driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala new file mode 100644 index 0000000..4553f9f --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/BasicInitContainerConfigurationStepSuite.scala @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps.initcontainer + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model._ +import org.mockito.{Mock, MockitoAnnotations} +import org.mockito.Matchers.any +import org.mockito.Mockito.when +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.{InitContainerBootstrap, PodWithDetachedInitContainer} +import org.apache.spark.deploy.k8s.Config._ + +class BasicInitContainerConfigurationStepSuite extends SparkFunSuite with BeforeAndAfter { + + private val SPARK_JARS = Seq( + "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar") + private val SPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt") + private val JARS_DOWNLOAD_PATH = "/var/data/jars" + private val FILES_DOWNLOAD_PATH = "/var/data/files" + private val POD_LABEL = Map("bootstrap" -> "true") + private val INIT_CONTAINER_NAME = "init-container" + private val DRIVER_CONTAINER_NAME = "driver-container" + + @Mock + private var podAndInitContainerBootstrap : InitContainerBootstrap = _ + + before { + MockitoAnnotations.initMocks(this) + when(podAndInitContainerBootstrap.bootstrapInitContainer( + any[PodWithDetachedInitContainer])).thenAnswer(new Answer[PodWithDetachedInitContainer] { + override def answer(invocation: InvocationOnMock) : PodWithDetachedInitContainer = { + val pod = invocation.getArgumentAt(0, classOf[PodWithDetachedInitContainer]) + pod.copy( + pod = new PodBuilder(pod.pod) + .withNewMetadata() + .addToLabels("bootstrap", "true") + .endMetadata() + .withNewSpec().endSpec() + .build(), + initContainer = new ContainerBuilder() + .withName(INIT_CONTAINER_NAME) + .build(), + mainContainer = new ContainerBuilder() + .withName(DRIVER_CONTAINER_NAME) + .build() + )}}) + } + + test("additionalDriverSparkConf with mix of remote files and jars") { + val baseInitStep = new BasicInitContainerConfigurationStep( + SPARK_JARS, + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + podAndInitContainerBootstrap) + val expectedDriverSparkConf = Map( + JARS_DOWNLOAD_LOCATION.key -> JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_LOCATION.key -> FILES_DOWNLOAD_PATH, + INIT_CONTAINER_REMOTE_JARS.key -> "hdfs://localhost:9000/app/jars/jar1.jar", + INIT_CONTAINER_REMOTE_FILES.key -> "hdfs://localhost:9000/app/files/file1.txt") + val initContainerSpec = InitContainerSpec( + Map.empty[String, String], + Map.empty[String, String], + new Container(), + new Container(), + new Pod, + Seq.empty[HasMetadata]) + val returnContainerSpec = baseInitStep.configureInitContainer(initContainerSpec) + assert(expectedDriverSparkConf === returnContainerSpec.properties) + assert(returnContainerSpec.initContainer.getName === INIT_CONTAINER_NAME) + assert(returnContainerSpec.driverContainer.getName === DRIVER_CONTAINER_NAME) + assert(returnContainerSpec.driverPod.getMetadata.getLabels.asScala === POD_LABEL) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala new file mode 100644 index 0000000..20f2e5b --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerConfigOrchestratorSuite.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps.initcontainer + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ + +class InitContainerConfigOrchestratorSuite extends SparkFunSuite { + + private val DOCKER_IMAGE = "init-container" + private val SPARK_JARS = Seq( + "hdfs://localhost:9000/app/jars/jar1.jar", "file:///app/jars/jar2.jar") + private val SPARK_FILES = Seq( + "hdfs://localhost:9000/app/files/file1.txt", "file:///app/files/file2.txt") + private val JARS_DOWNLOAD_PATH = "/var/data/jars" + private val FILES_DOWNLOAD_PATH = "/var/data/files" + private val DOCKER_IMAGE_PULL_POLICY: String = "IfNotPresent" + private val CUSTOM_LABEL_KEY = "customLabel" + private val CUSTOM_LABEL_VALUE = "customLabelValue" + private val INIT_CONTAINER_CONFIG_MAP_NAME = "spark-init-config-map" + private val INIT_CONTAINER_CONFIG_MAP_KEY = "spark-init-config-map-key" + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/init-container" + + test("including basic configuration step") { + val sparkConf = new SparkConf(true) + .set(INIT_CONTAINER_IMAGE, DOCKER_IMAGE) + .set(s"$KUBERNETES_DRIVER_LABEL_PREFIX$CUSTOM_LABEL_KEY", CUSTOM_LABEL_VALUE) + + val orchestrator = new InitContainerConfigOrchestrator( + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps = orchestrator.getAllConfigurationSteps + assert(initSteps.lengthCompare(1) == 0) + assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep]) + } + + test("including step to mount user-specified secrets") { + val sparkConf = new SparkConf(false) + .set(INIT_CONTAINER_IMAGE, DOCKER_IMAGE) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_FOO", SECRET_MOUNT_PATH) + .set(s"$KUBERNETES_DRIVER_SECRETS_PREFIX$SECRET_BAR", SECRET_MOUNT_PATH) + + val orchestrator = new InitContainerConfigOrchestrator( + SPARK_JARS.take(1), + SPARK_FILES, + JARS_DOWNLOAD_PATH, + FILES_DOWNLOAD_PATH, + DOCKER_IMAGE_PULL_POLICY, + INIT_CONTAINER_CONFIG_MAP_NAME, + INIT_CONTAINER_CONFIG_MAP_KEY, + sparkConf) + val initSteps = orchestrator.getAllConfigurationSteps + assert(initSteps.length === 2) + assert(initSteps.head.isInstanceOf[BasicInitContainerConfigurationStep]) + assert(initSteps(1).isInstanceOf[InitContainerMountSecretsStep]) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala new file mode 100644 index 0000000..eab4e17 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/initcontainer/InitContainerMountSecretsStepSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps.initcontainer + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.MountSecretsBootstrap +import org.apache.spark.deploy.k8s.submit.SecretVolumeUtils + +class InitContainerMountSecretsStepSuite extends SparkFunSuite { + + private val SECRET_FOO = "foo" + private val SECRET_BAR = "bar" + private val SECRET_MOUNT_PATH = "/etc/secrets/init-container" + + test("mounts all given secrets") { + val baseInitContainerSpec = InitContainerSpec( + Map.empty, + Map.empty, + new ContainerBuilder().build(), + new ContainerBuilder().build(), + new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build(), + Seq.empty) + val secretNamesToMountPaths = Map( + SECRET_FOO -> SECRET_MOUNT_PATH, + SECRET_BAR -> SECRET_MOUNT_PATH) + + val mountSecretsBootstrap = new MountSecretsBootstrap(secretNamesToMountPaths) + val initContainerMountSecretsStep = new InitContainerMountSecretsStep(mountSecretsBootstrap) + val configuredInitContainerSpec = initContainerMountSecretsStep.configureInitContainer( + baseInitContainerSpec) + + val podWithSecretsMounted = configuredInitContainerSpec.driverPod + val initContainerWithSecretsMounted = configuredInitContainerSpec.initContainer + + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName => + assert(SecretVolumeUtils.podHasVolume(podWithSecretsMounted, volumeName))) + Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach(volumeName => + assert(SecretVolumeUtils.containerHasVolume( + initContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH))) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala new file mode 100644 index 0000000..6c557ec --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/rest/k8s/SparkPodInitContainerSuite.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.rest.k8s + +import java.io.File +import java.util.UUID + +import com.google.common.base.Charsets +import com.google.common.io.Files +import org.mockito.Mockito +import org.scalatest.BeforeAndAfter +import org.scalatest.mockito.MockitoSugar._ + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.util.Utils + +class SparkPodInitContainerSuite extends SparkFunSuite with BeforeAndAfter { + + private val DOWNLOAD_JARS_SECRET_LOCATION = createTempFile("txt") + private val DOWNLOAD_FILES_SECRET_LOCATION = createTempFile("txt") + + private var downloadJarsDir: File = _ + private var downloadFilesDir: File = _ + private var downloadJarsSecretValue: String = _ + private var downloadFilesSecretValue: String = _ + private var fileFetcher: FileFetcher = _ + + override def beforeAll(): Unit = { + downloadJarsSecretValue = Files.toString( + new File(DOWNLOAD_JARS_SECRET_LOCATION), Charsets.UTF_8) + downloadFilesSecretValue = Files.toString( + new File(DOWNLOAD_FILES_SECRET_LOCATION), Charsets.UTF_8) + } + + before { + downloadJarsDir = Utils.createTempDir() + downloadFilesDir = Utils.createTempDir() + fileFetcher = mock[FileFetcher] + } + + after { + downloadJarsDir.delete() + downloadFilesDir.delete() + } + + test("Downloads from remote server should invoke the file fetcher") { + val sparkConf = getSparkConfForRemoteFileDownloads + val initContainerUnderTest = new SparkPodInitContainer(sparkConf, fileFetcher) + initContainerUnderTest.run() + Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/jar1.jar", downloadJarsDir) + Mockito.verify(fileFetcher).fetchFile("hdfs://localhost:9000/jar2.jar", downloadJarsDir) + Mockito.verify(fileFetcher).fetchFile("http://localhost:9000/file.txt", downloadFilesDir) + } + + private def getSparkConfForRemoteFileDownloads: SparkConf = { + new SparkConf(true) + .set(INIT_CONTAINER_REMOTE_JARS, + "http://localhost:9000/jar1.jar,hdfs://localhost:9000/jar2.jar") + .set(INIT_CONTAINER_REMOTE_FILES, + "http://localhost:9000/file.txt") + .set(JARS_DOWNLOAD_LOCATION, downloadJarsDir.getAbsolutePath) + .set(FILES_DOWNLOAD_LOCATION, downloadFilesDir.getAbsolutePath) + } + + private def createTempFile(extension: String): String = { + val dir = Utils.createTempDir() + val file = new File(dir, s"${UUID.randomUUID().toString}.$extension") + Files.write(UUID.randomUUID().toString, file, Charsets.UTF_8) + file.getAbsolutePath + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala index 3a55d7c..7121a80 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodFactorySuite.scala @@ -18,15 +18,19 @@ package org.apache.spark.scheduler.cluster.k8s import scala.collection.JavaConverters._ -import io.fabric8.kubernetes.api.model.{Pod, _} -import org.mockito.MockitoAnnotations +import io.fabric8.kubernetes.api.model._ +import org.mockito.{AdditionalAnswers, MockitoAnnotations} +import org.mockito.Matchers.any +import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{InitContainerBootstrap, MountSecretsBootstrap, PodWithDetachedInitContainer} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with BeforeAndAfterEach { + private val driverPodName: String = "driver-pod" private val driverPodUid: String = "driver-uid" private val executorPrefix: String = "base" @@ -54,7 +58,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef } test("basic executor pod has reasonable defaults") { - val factory = new ExecutorPodFactoryImpl(baseConf) + val factory = new ExecutorPodFactory(baseConf, None, None, None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -85,7 +89,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef conf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, "loremipsumdolorsitametvimatelitrefficiendisuscipianturvixlegeresple") - val factory = new ExecutorPodFactoryImpl(conf) + val factory = new ExecutorPodFactory(conf, None, None, None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) @@ -97,7 +101,7 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar") conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") - val factory = new ExecutorPodFactoryImpl(conf) + val factory = new ExecutorPodFactory(conf, None, None, None) val executor = factory.createExecutorPod( "1", "dummy", "dummy", Seq[(String, String)]("qux" -> "quux"), driverPod, Map[String, Int]()) @@ -108,6 +112,74 @@ class ExecutorPodFactorySuite extends SparkFunSuite with BeforeAndAfter with Bef checkOwnerReferences(executor, driverPodUid) } + test("executor secrets get mounted") { + val conf = baseConf.clone() + + val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1")) + val factory = new ExecutorPodFactory( + conf, + Some(secretsBootstrap), + None, + None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getContainers.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.size() === 1) + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0).getName + === "secret1-volume") + assert(executor.getSpec.getContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/var/secret1") + + // check volume mounted. + assert(executor.getSpec.getVolumes.size() === 1) + assert(executor.getSpec.getVolumes.get(0).getSecret.getSecretName === "secret1") + + checkOwnerReferences(executor, driverPodUid) + } + + test("init-container bootstrap step adds an init container") { + val conf = baseConf.clone() + val initContainerBootstrap = mock(classOf[InitContainerBootstrap]) + when(initContainerBootstrap.bootstrapInitContainer( + any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + + val factory = new ExecutorPodFactory( + conf, + None, + Some(initContainerBootstrap), + None) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getInitContainers.size() === 1) + checkOwnerReferences(executor, driverPodUid) + } + + test("init-container with secrets mount bootstrap") { + val conf = baseConf.clone() + val initContainerBootstrap = mock(classOf[InitContainerBootstrap]) + when(initContainerBootstrap.bootstrapInitContainer( + any(classOf[PodWithDetachedInitContainer]))).thenAnswer(AdditionalAnswers.returnsFirstArg()) + val secretsBootstrap = new MountSecretsBootstrap(Map("secret1" -> "/var/secret1")) + + val factory = new ExecutorPodFactory( + conf, + None, + Some(initContainerBootstrap), + Some(secretsBootstrap)) + val executor = factory.createExecutorPod( + "1", "dummy", "dummy", Seq[(String, String)](), driverPod, Map[String, Int]()) + + assert(executor.getSpec.getInitContainers.size() === 1) + assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0).getName + === "secret1-volume") + assert(executor.getSpec.getInitContainers.get(0).getVolumeMounts.get(0) + .getMountPath === "/var/secret1") + + checkOwnerReferences(executor, driverPodUid) + } + // There is always exactly one controller reference, and it points to the driver pod. private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile index 9b682f8..45fbcd9 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile @@ -22,7 +22,7 @@ FROM spark-base # If this docker file is being used in the context of building your images from a Spark # distribution, the docker build command should be invoked from the top level directory # of the Spark distribution. E.g.: -# docker build -t spark-driver:latest -f kubernetes/dockerfiles/spark-base/Dockerfile . +# docker build -t spark-driver:latest -f kubernetes/dockerfiles/driver/Dockerfile . COPY examples /opt/spark/examples @@ -31,4 +31,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile index 168cd4c..0f806cf 100644 --- a/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile @@ -22,7 +22,7 @@ FROM spark-base # If this docker file is being used in the context of building your images from a Spark # distribution, the docker build command should be invoked from the top level directory # of the Spark distribution. E.g.: -# docker build -t spark-executor:latest -f kubernetes/dockerfiles/spark-base/Dockerfile . +# docker build -t spark-executor:latest -f kubernetes/dockerfiles/executor/Dockerfile . COPY examples /opt/spark/examples @@ -31,4 +31,5 @@ CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ + if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ ${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP http://git-wip-us.apache.org/repos/asf/spark/blob/171f6dda/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile new file mode 100644 index 0000000..0554931 --- /dev/null +++ b/resource-managers/kubernetes/docker/src/main/dockerfiles/init-container/Dockerfile @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-init:latest -f kubernetes/dockerfiles/init-container/Dockerfile . + +ENTRYPOINT [ "/opt/entrypoint.sh", "/opt/spark/bin/spark-class", "org.apache.spark.deploy.rest.k8s.SparkPodInitContainer" ] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org