Repository: spark Updated Branches: refs/heads/master 8856e9f6a -> 6be272b75
http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 3708864..7e7dc47 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -83,48 +83,21 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => templateVolumeStep) test("Apply fundamental steps all the time.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - JavaMainAppResource(Some("example.jar")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = None) + val conf = KubernetesTestConf.createDriverConf() validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) + DRIVER_CMD_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) } test("Apply secrets step if secrets are present.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map("secret" -> "secretMountPath"), - Map("EnvName" -> "SecretName:secretKey"), - Map.empty, - Nil, - hadoopConfSpec = None) + val conf = KubernetesTestConf.createDriverConf( + secretEnvNamesToKeyRefs = Map("EnvName" -> "SecretName:secretKey"), + secretNamesToMountPaths = Map("secret" -> "secretMountPath")) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -133,7 +106,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, ENV_SECRETS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) + DRIVER_CMD_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) } test("Apply volumes step if mounts are present.") { @@ -143,22 +117,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { "", false, KubernetesHostPathVolumeConf("/path")) - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - volumeSpec :: Nil, - hadoopConfSpec = None) + val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec)) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -166,7 +125,8 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) + DRIVER_CMD_STEP_TYPE, + HADOOP_GLOBAL_STEP_TYPE) } test("Apply volumes step if a mount subpath is present.") { @@ -176,22 +136,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { "foo", false, KubernetesHostPathVolumeConf("/path")) - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - volumeSpec :: Nil, - hadoopConfSpec = None) + val conf = KubernetesTestConf.createDriverConf(volumes = Seq(volumeSpec)) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -199,89 +144,14 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - DRIVER_CMD_STEP_TYPE) - } - - test("Apply template volume step if executor template is present.") { - val sparkConf = spy(new SparkConf(false)) - doReturn(Option("filename")).when(sparkConf) - .get(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE) - val conf = KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(Some("example.jar")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Option.empty) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, - DRIVER_CMD_STEP_TYPE, - TEMPLATE_VOLUME_STEP_TYPE) - } - - test("Apply HadoopSteps if HADOOP_CONF_DIR is defined.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = Some( - HadoopConfSpec( - Some("/var/hadoop-conf"), - None))) - validateStepTypesApplied( - builderUnderTest.buildFromFeatures(conf), - BASIC_STEP_TYPE, - CREDENTIALS_STEP_TYPE, - SERVICE_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE, DRIVER_CMD_STEP_TYPE, HADOOP_GLOBAL_STEP_TYPE) } - test("Apply HadoopSteps if HADOOP_CONF ConfigMap is defined.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesDriverSpecificConf( - JavaMainAppResource(None), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - hadoopConfSpec = Some( - HadoopConfSpec( - None, - Some("pre-defined-configMapName")))) + test("Apply template volume step if executor template is present.") { + val sparkConf = new SparkConf(false) + .set(KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "filename") + val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -289,12 +159,16 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, DRIVER_CMD_STEP_TYPE, - HADOOP_GLOBAL_STEP_TYPE) + HADOOP_GLOBAL_STEP_TYPE, + TEMPLATE_VOLUME_STEP_TYPE) } private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, stepTypes: String*) : Unit = { - assert(resolvedSpec.systemProperties.size === stepTypes.size) + val addedProperties = resolvedSpec.systemProperties + .filter { case (k, _) => !k.startsWith("spark.") } + .toMap + assert(addedProperties.keys.toSet === stepTypes.toSet) stepTypes.foreach { stepType => assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType) assert(resolvedSpec.driverKubernetesResources.containsSlice( @@ -314,22 +188,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val sparkConf = new SparkConf(false) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") - val kubernetesConf = new KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(Some("example.jar")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Option.empty) + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) val driverSpec = KubernetesDriverBuilder .apply(kubernetesClient, sparkConf) .buildFromFeatures(kubernetesConf) @@ -346,22 +205,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { val sparkConf = new SparkConf(false) .set(CONTAINER_IMAGE, "spark-driver:latest") .set(KUBERNETES_DRIVER_PODTEMPLATE_FILE, "template-file.yaml") - val kubernetesConf = new KubernetesConf( - sparkConf, - KubernetesDriverSpecificConf( - JavaMainAppResource(Some("example.jar")), - "test-app", - "main", - Seq.empty), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Option.empty) + val kubernetesConf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf) val exception = intercept[SparkException] { KubernetesDriverBuilder .apply(kubernetesClient, sparkConf) http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index 2f984e5..ddf9f67 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -27,7 +27,7 @@ import org.mockito.stubbing.Answer import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesExecutorConf, KubernetesTestConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.Fabric8Aliases._ @@ -79,7 +79,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { when(kubernetesClient.pods()).thenReturn(podOperations) when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) when(driverPodOperations.get).thenReturn(driverPod) - when(executorBuilder.buildFromFeatures(kubernetesConfWithCorrectFields())) + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]))) .thenAnswer(executorPodAnswer()) snapshotsStore = new DeterministicExecutorPodsSnapshotsStore() waitForExecutorPodsClock = new ManualClock(0L) @@ -147,44 +147,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { private def executorPodAnswer(): Answer[SparkPod] = { new Answer[SparkPod] { override def answer(invocation: InvocationOnMock): SparkPod = { - val k8sConf = invocation.getArgumentAt( - 0, classOf[KubernetesConf[KubernetesExecutorSpecificConf]]) - executorPodWithId(k8sConf.roleSpecificConf.executorId.toInt) + val k8sConf = invocation.getArgumentAt(0, classOf[KubernetesExecutorConf]) + executorPodWithId(k8sConf.executorId.toInt) } } } - - private def kubernetesConfWithCorrectFields(): KubernetesConf[KubernetesExecutorSpecificConf] = - Matchers.argThat(new ArgumentMatcher[KubernetesConf[KubernetesExecutorSpecificConf]] { - override def matches(argument: scala.Any): Boolean = { - if (!argument.isInstanceOf[KubernetesConf[_]]) { - false - } else { - val k8sConf = argument.asInstanceOf[KubernetesConf[KubernetesExecutorSpecificConf]] - val executorSpecificConf = k8sConf.roleSpecificConf - // TODO: HADOOP_CONF_DIR - val expectedK8sConf = KubernetesConf.createExecutorConf( - conf, - executorSpecificConf.executorId, - TEST_SPARK_APP_ID, - Some(driverPod)) - - // Set prefixes to a common string since KUBERNETES_EXECUTOR_POD_NAME_PREFIX - // has not be set for the tests and thus KubernetesConf will use a random - // string for the prefix, based on the app name, and this comparison here will fail. - val k8sConfCopy = k8sConf - .copy(appResourceNamePrefix = "") - .copy(sparkConf = conf) - val expectedK8sConfCopy = expectedK8sConf - .copy(appResourceNamePrefix = "") - .copy(sparkConf = conf) - - k8sConf.sparkConf.getAll.toMap == conf.getAll.toMap && - // Since KubernetesConf.createExecutorConf clones the SparkConf object, force - // deep equality comparison for the SparkConf object and use object equality - // comparison on all other fields. - k8sConfCopy == expectedK8sConfCopy - } - } - }) } http://git-wip-us.apache.org/repos/asf/spark/blob/6be272b7/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index a59f6d0..b6a75b1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.scheduler.cluster.k8s +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.{Config => _, _} import io.fabric8.kubernetes.client.KubernetesClient import org.mockito.Mockito.{mock, never, verify} @@ -25,6 +27,7 @@ import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.features._ import org.apache.spark.deploy.k8s.submit.PodBuilderSuiteUtils +import org.apache.spark.util.SparkConfWithEnv class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val BASIC_STEP_TYPE = "basic" @@ -64,37 +67,15 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { _ => hadoopSparkUser) test("Basic steps are consistently applied.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesExecutorSpecificConf( - "executor-id", Some(new PodBuilder().build())), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - None) + val conf = KubernetesTestConf.createExecutorConf() validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE) } test("Apply secrets step if secrets are present.") { - val conf = KubernetesConf( - new SparkConf(false), - KubernetesExecutorSpecificConf( - "executor-id", Some(new PodBuilder().build())), - "prefix", - "appId", - Map.empty, - Map.empty, - Map("secret" -> "secretMountPath"), - Map("secret-name" -> "secret-key"), - Map.empty, - Nil, - None) + val conf = KubernetesTestConf.createExecutorConf( + secretEnvNamesToKeyRefs = Map("secret-name" -> "secret-key"), + secretNamesToMountPaths = Map("secret" -> "secretMountPath")) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -110,19 +91,8 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { "", false, KubernetesHostPathVolumeConf("/checkpoint")) - val conf = KubernetesConf( - new SparkConf(false), - KubernetesExecutorSpecificConf( - "executor-id", Some(new PodBuilder().build())), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - volumeSpec :: Nil, - None) + val conf = KubernetesTestConf.createExecutorConf( + volumes = Seq(volumeSpec)) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -132,25 +102,10 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { test("Apply basicHadoop step if HADOOP_CONF_DIR is defined") { // HADOOP_DELEGATION_TOKEN - val HADOOP_CREDS_PREFIX = "spark.security.credentials." - val HADOOPFS_PROVIDER = s"$HADOOP_CREDS_PREFIX.hadoopfs.enabled" - val conf = KubernetesConf( - new SparkConf(false) + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = new SparkConfWithEnv(Map("HADOOP_CONF_DIR" -> "/var/hadoop-conf")) .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") - .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name") - .set(KERBEROS_SPARK_USER_NAME, "spark-user") - .set(HADOOPFS_PROVIDER, "true"), - KubernetesExecutorSpecificConf( - "executor-id", Some(new PodBuilder().build())), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Some(HadoopConfSpec(Some("/var/hadoop-conf"), None))) + .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name")) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -160,24 +115,13 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { } test("Apply kerberos step if DT secrets created") { - val conf = KubernetesConf( - new SparkConf(false) + val conf = KubernetesTestConf.createExecutorConf( + sparkConf = new SparkConf(false) .set(HADOOP_CONFIG_MAP_NAME, "hadoop-conf-map-name") .set(KRB5_CONFIG_MAP_NAME, "krb5-conf-map-name") .set(KERBEROS_SPARK_USER_NAME, "spark-user") .set(KERBEROS_DT_SECRET_NAME, "dt-secret") - .set(KERBEROS_DT_SECRET_KEY, "dt-key"), - KubernetesExecutorSpecificConf( - "executor-id", Some(new PodBuilder().build())), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Some(HadoopConfSpec(None, Some("pre-defined-onfigMapName")))) + .set(KERBEROS_DT_SECRET_KEY, "dt-key" )) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -187,10 +131,7 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { } private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = { - assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size) - stepTypes.foreach { stepType => - assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType) - } + assert(resolvedPod.pod.getMetadata.getLabels.asScala.keys.toSet === stepTypes.toSet) } test("Starts with empty executor pod if template is not specified") { @@ -205,25 +146,14 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { .set("spark.driver.host", "https://driver.host.com") .set(Config.CONTAINER_IMAGE, "spark-executor:latest") .set(Config.KUBERNETES_EXECUTOR_PODTEMPLATE_FILE, "template-file.yaml") - val kubernetesConf = KubernetesConf( - sparkConf, - KubernetesExecutorSpecificConf( - "executor-id", Some(new PodBuilder() - .withNewMetadata() + val kubernetesConf = KubernetesTestConf.createExecutorConf( + sparkConf = sparkConf, + driverPod = Some(new PodBuilder() + .withNewMetadata() .withName("driver") .endMetadata() - .build())), - "prefix", - "appId", - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Map.empty, - Nil, - Option.empty) - val sparkPod = KubernetesExecutorBuilder - .apply(kubernetesClient, sparkConf) + .build())) + val sparkPod = KubernetesExecutorBuilder(kubernetesClient, sparkConf) .buildFromFeatures(kubernetesConf) PodBuilderSuiteUtils.verifyPodWithSupportedFeatures(sparkPod) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org