This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new e747bcf [SPARK-48017] Add Spark application submission worker for operator e747bcf is described below commit e747bcfab106b828bbd9f2d44968698e5dce3c33 Author: zhou-jiang <zhou_ji...@apple.com> AuthorDate: Mon May 20 10:41:48 2024 -0700 [SPARK-48017] Add Spark application submission worker for operator ### What changes were proposed in this pull request? This is a breakdown PR of #2 - adding a submission worker implementation for SparkApplication. ### Why are the changes needed? Spark Operator needs a submission worker to convert its abstraction (the SparkApplication API) into k8s resource spec. This is a light-weight implementation based on native k8s integration. As of now, it's based off Spark 4.0.0-preview1 - but it's assumed to serve all Spark LTS versions. This is feasible because as it aims to cover only the spec generation, Spark core jars are still brought-in by application images. E2Es would set up with operator later to ensure that. Per SPIP doc, in future operator version(s) we may add more implementations for submission worker based on different Spark versions to achieve 100% version agnostic, at the cost of having multiple workers stand-by. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test coverage. ### Was this patch authored or co-authored using generative AI tooling? no Closes #10 from jiangzho/worker. Authored-by: zhou-jiang <zhou_ji...@apple.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- build.gradle | 4 + gradle.properties | 7 + settings.gradle | 1 + spark-operator-api/build.gradle | 1 + .../spark/k8s/operator/utils/ModelUtils.java | 9 + spark-submission-worker/build.gradle | 18 ++ .../spark/k8s/operator/SparkAppDriverConf.java | 73 +++++++ .../spark/k8s/operator/SparkAppResourceSpec.java | 129 ++++++++++++ .../k8s/operator/SparkAppSubmissionWorker.java | 175 +++++++++++++++++ .../spark/k8s/operator/SparkAppDriverConfTest.java | 75 +++++++ .../k8s/operator/SparkAppResourceSpecTest.java | 137 +++++++++++++ .../k8s/operator/SparkAppSubmissionWorkerTest.java | 218 +++++++++++++++++++++ 12 files changed, 847 insertions(+) diff --git a/build.gradle b/build.gradle index a6c1701..c0c75d0 100644 --- a/build.gradle +++ b/build.gradle @@ -25,6 +25,10 @@ subprojects { repositories { mavenCentral() + // TODO(SPARK-48326) Upgrade submission worker base Spark version to 4.0.0-preview2 + maven { + url "https://repository.apache.org/content/repositories/orgapachespark-1454/" + } } apply plugin: 'checkstyle' diff --git a/gradle.properties b/gradle.properties index 2606179..ffa8302 100644 --- a/gradle.properties +++ b/gradle.properties @@ -18,17 +18,24 @@ group=org.apache.spark.k8s.operator version=0.1.0 +# Caution: fabric8 version should be aligned with Spark dependency fabric8Version=6.12.1 commonsLang3Version=3.14.0 commonsIOVersion=2.16.1 lombokVersion=1.18.32 +# Spark +scalaVersion=2.13 +# TODO(SPARK-48326) Upgrade submission worker base Spark version to 4.0.0-preview2 +sparkVersion=4.0.0-preview1 + # Logging log4jVersion=2.22.1 # Test junitVersion=5.10.2 jacocoVersion=0.8.12 +mockitoVersion=5.11.0 # Build Analysis checkstyleVersion=10.15.0 diff --git a/settings.gradle b/settings.gradle index 69e7827..6808ec7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,2 +1,3 @@ rootProject.name = 'apache-spark-kubernetes-operator' include 'spark-operator-api' +include 'spark-submission-worker' diff --git a/spark-operator-api/build.gradle b/spark-operator-api/build.gradle index b57beca..696415f 100644 --- a/spark-operator-api/build.gradle +++ b/spark-operator-api/build.gradle @@ -18,6 +18,7 @@ dependencies { testImplementation platform("org.junit:junit-bom:$junitVersion") testImplementation 'org.junit.jupiter:junit-jupiter' + testRuntimeOnly "org.junit.platform:junit-platform-launcher" } test { diff --git a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java index 454e706..03d84be 100644 --- a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java +++ b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java @@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodTemplateSpec; import org.apache.commons.lang3.StringUtils; +import org.apache.spark.k8s.operator.SparkApplication; import org.apache.spark.k8s.operator.spec.ApplicationSpec; public class ModelUtils { @@ -107,4 +108,12 @@ public class ModelUtils { && applicationSpec.getExecutorSpec() != null && applicationSpec.getExecutorSpec().getPodTemplateSpec() != null; } + + public static long getAttemptId(final SparkApplication app) { + long attemptId = 0L; + if (app.getStatus() != null && app.getStatus().getCurrentAttemptSummary() != null) { + attemptId = app.getStatus().getCurrentAttemptSummary().getAttemptInfo().getId(); + } + return attemptId; + } } diff --git a/spark-submission-worker/build.gradle b/spark-submission-worker/build.gradle new file mode 100644 index 0000000..da78026 --- /dev/null +++ b/spark-submission-worker/build.gradle @@ -0,0 +1,18 @@ +dependencies { + implementation project(":spark-operator-api") + + implementation("org.apache.spark:spark-kubernetes_$scalaVersion:$sparkVersion") + + compileOnly("org.projectlombok:lombok:$lombokVersion") + annotationProcessor("org.projectlombok:lombok:$lombokVersion") + + testImplementation platform("org.junit:junit-bom:$junitVersion") + testImplementation "org.mockito:mockito-core:$mockitoVersion" + testImplementation "org.junit.jupiter:junit-jupiter:$junitVersion" + testImplementation "io.fabric8:kubernetes-server-mock:$fabric8Version" + testRuntimeOnly "org.junit.platform:junit-platform-launcher" +} + +test { + useJUnitPlatform() +} diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java new file mode 100644 index 0000000..f2870a2 --- /dev/null +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import scala.Option; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.Config; +import org.apache.spark.deploy.k8s.KubernetesDriverConf; +import org.apache.spark.deploy.k8s.KubernetesVolumeUtils; +import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils; +import org.apache.spark.deploy.k8s.submit.MainAppResource; + +public class SparkAppDriverConf extends KubernetesDriverConf { + private SparkAppDriverConf( + SparkConf sparkConf, + String appId, + MainAppResource mainAppResource, + String mainClass, + String[] appArgs, + Option<String> proxyUser) { + super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, null); + } + + public static SparkAppDriverConf create( + SparkConf sparkConf, + String appId, + MainAppResource mainAppResource, + String mainClass, + String[] appArgs, + Option<String> proxyUser) { + // pre-create check only + KubernetesVolumeUtils.parseVolumesWithPrefix( + sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX()); + return new SparkAppDriverConf(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser); + } + + /** Application managed by operator has a deterministic prefix */ + @Override + public String resourceNamePrefix() { + return appId(); + } + + /** + * Create the name to be used by driver config map. The consists of `resourceNamePrefix` and Spark + * instance type (driver). Operator proposes `resourceNamePrefix` with leaves naming length margin + * for sub-resources to be qualified as DNS subdomain or label. In addition, the overall config + * name length is governed by `KubernetesClientUtils.configMapName` - which ensures the name + * length meets requirements as DNS subdomain name. + * + * @return proposed name to be used by driver config map + */ + public String configMapNameDriver() { + return KubernetesClientUtils.configMapName(String.format("%s-spark-drv", resourceNamePrefix())); + } +} diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java new file mode 100644 index 0000000..101b29a --- /dev/null +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.util.ArrayList; +import java.util.List; + +import scala.Tuple2; +import scala.collection.immutable.HashMap; +import scala.collection.immutable.Map; +import scala.jdk.CollectionConverters; + +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import lombok.Getter; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.deploy.k8s.Config; +import org.apache.spark.deploy.k8s.Constants; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.SparkPod; +import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils; + +/** + * Resembles resources that would be directly launched by operator. Based on resolved + * org.apache.spark.deploy.k8s.KubernetesDriverSpec, it: + * + * <ul> + * <li>Add ConfigMap as a resource for driver + * <li>Converts scala types to Java for easier reference from operator + * </ul> + * + * <p>This is not thread safe and not expected to be shared among reconciler threads + */ +public class SparkAppResourceSpec { + @Getter private final Pod configuredPod; + @Getter private final List<HasMetadata> driverPreResources; + @Getter private final List<HasMetadata> driverResources; + private final SparkAppDriverConf kubernetesDriverConf; + + public SparkAppResourceSpec( + SparkAppDriverConf kubernetesDriverConf, KubernetesDriverSpec kubernetesDriverSpec) { + this.kubernetesDriverConf = kubernetesDriverConf; + String namespace = kubernetesDriverConf.sparkConf().get(Config.KUBERNETES_NAMESPACE().key()); + Map<String, String> confFilesMap = + KubernetesClientUtils.buildSparkConfDirFilesMap( + kubernetesDriverConf.configMapNameDriver(), + kubernetesDriverConf.sparkConf(), + kubernetesDriverSpec.systemProperties()) + .$plus(new Tuple2<>(Config.KUBERNETES_NAMESPACE().key(), namespace)); + SparkPod sparkPod = addConfigMap(kubernetesDriverSpec.pod(), confFilesMap); + this.configuredPod = + new PodBuilder(sparkPod.pod()) + .editSpec() + .addToContainers(sparkPod.container()) + .endSpec() + .build(); + this.driverPreResources = + new ArrayList<>( + CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverPreKubernetesResources()) + .asJava()); + this.driverResources = + new ArrayList<>( + CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverKubernetesResources()) + .asJava()); + this.driverResources.add( + KubernetesClientUtils.buildConfigMap( + kubernetesDriverConf.configMapNameDriver(), confFilesMap, new HashMap<>())); + this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace)); + this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace)); + } + + private void setNamespaceIfMissing(HasMetadata resource, String namespace) { + if (StringUtils.isNotEmpty(resource.getMetadata().getNamespace())) { + return; + } + resource.getMetadata().setNamespace(namespace); + } + + private SparkPod addConfigMap(SparkPod pod, Map<String, String> confFilesMap) { + Container containerWithConfigMapVolume = + new ContainerBuilder(pod.container()) + .addNewEnv() + .withName(Constants.ENV_SPARK_CONF_DIR()) + .withValue(Constants.SPARK_CONF_DIR_INTERNAL()) + .endEnv() + .addNewVolumeMount() + .withName(Constants.SPARK_CONF_VOLUME_DRIVER()) + .withMountPath(Constants.SPARK_CONF_DIR_INTERNAL()) + .endVolumeMount() + .build(); + Pod podWithConfigMapVolume = + new PodBuilder(pod.pod()) + .editSpec() + .addNewVolume() + .withName(Constants.SPARK_CONF_VOLUME_DRIVER()) + .withNewConfigMap() + .withItems( + CollectionConverters.SeqHasAsJava( + KubernetesClientUtils.buildKeyToPathObjects(confFilesMap)) + .asJava()) + .withName(kubernetesDriverConf.configMapNameDriver()) + .endConfigMap() + .endVolume() + .endSpec() + .build(); + return new SparkPod(podWithConfigMapVolume, containerWithConfigMapVolume); + } +} diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java new file mode 100644 index 0000000..57013cb --- /dev/null +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import java.math.BigInteger; +import java.util.Map; + +import scala.Option; + +import io.fabric8.kubernetes.client.KubernetesClient; +import org.apache.commons.codec.digest.DigestUtils; +import org.apache.commons.collections.MapUtils; +import org.apache.commons.lang3.StringUtils; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder; +import org.apache.spark.deploy.k8s.submit.MainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.utils.ModelUtils; + +/** + * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. This reads args from + * SparkApplication instead of starting separate spark-submit process + */ +public class SparkAppSubmissionWorker { + // Default length limit for generated app id. Generated id is used as resource-prefix when + // user-provided id is too long for this purpose. This applied to all resources associated with + // the Spark app (including k8s service which has different naming length limit). Thus, we + // truncate the hash part to 46 chars to leave some margin for spark resource prefix and suffix + // (e.g. 'spark-', '-driver-svc' . etc) + public static final int DEFAULT_ID_LENGTH_LIMIT = 46; + // Default length limit to be applied to the hash-based part of generated id + public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36; + // Radix value used when generating hash-based identifier + public static final int DEFAULT_ENCODE_BASE = 36; + public static final String DEFAULT_MASTER_URL_PREFIX = "k8s://"; + public static final String MASTER_URL_PREFIX_PROPS_NAME = "spark.master.url.prefix"; + + /** + * Build secondary resource spec for given app with Spark developer API, with defaults / overrides + * as: + * + * <ul> + * <li>spark.kubernetes.namespace - if provided and the provided value is different from the + * namespace that the SparkApp resides in, it would be force override to the same value as + * the SparkApp custom resource. + * <li>spark.jars - if not provided, this would be set to the value in .spec.jars + * <li>spark.submit.pyFiles - if not provided, this would be set to the value in .spec.PyFiles + * <li>spark.master - if not provided, this would be automatically built with + * KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT, and prefix 'k8s://' for native Spark + * ExternalClusterManager. It is possible to invoke custom ClusterManager: to do so, either + * set 'spark.master', or set `spark.master.url.prefix` so submission worker can build + * master url based on it. + * </ul> + * + * @param app the SparkApp resource + * @param client k8s client + * @param confOverrides key-value pairs of conf overrides for the given app. + * @return secondary resources spec as `SparkAppResourceSpec` + */ + public SparkAppResourceSpec getResourceSpec( + SparkApplication app, KubernetesClient client, Map<String, String> confOverrides) { + SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides); + return buildResourceSpec(appDriverConf, client); + } + + protected SparkAppDriverConf buildDriverConf( + SparkApplication app, Map<String, String> confOverrides) { + ApplicationSpec applicationSpec = app.getSpec(); + SparkConf effectiveSparkConf = new SparkConf(); + if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) { + for (String confKey : applicationSpec.getSparkConf().keySet()) { + effectiveSparkConf.set(confKey, applicationSpec.getSparkConf().get(confKey)); + } + } + if (MapUtils.isNotEmpty(confOverrides)) { + for (Map.Entry<String, String> entry : confOverrides.entrySet()) { + effectiveSparkConf.set(entry.getKey(), entry.getValue()); + } + } + effectiveSparkConf.set("spark.kubernetes.namespace", app.getMetadata().getNamespace()); + MainAppResource primaryResource = new JavaMainAppResource(Option.empty()); + if (StringUtils.isNotEmpty(applicationSpec.getJars())) { + primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); + effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); + } else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { + primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); + } else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) { + primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles()); + } + String sparkMasterUrlPrefix = + effectiveSparkConf.get(MASTER_URL_PREFIX_PROPS_NAME, DEFAULT_MASTER_URL_PREFIX); + effectiveSparkConf.setIfMissing( + "spark.master", + sparkMasterUrlPrefix + "https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT"); + String appId = generateSparkAppId(app); + effectiveSparkConf.setIfMissing("spark.app.id", appId); + return SparkAppDriverConf.create( + effectiveSparkConf, + appId, + primaryResource, + applicationSpec.getMainClass(), + applicationSpec.getDriverArgs().toArray(String[]::new), + Option.apply(applicationSpec.getProxyUser())); + } + + protected SparkAppResourceSpec buildResourceSpec( + SparkAppDriverConf kubernetesDriverConf, KubernetesClient client) { + KubernetesDriverBuilder builder = new KubernetesDriverBuilder(); + KubernetesDriverSpec kubernetesDriverSpec = + builder.buildFromFeatures(kubernetesDriverConf, client); + return new SparkAppResourceSpec(kubernetesDriverConf, kubernetesDriverSpec); + } + + /** + * Generate a Spark application id. Similar to `KubernetesConf.getKubernetesAppId()`. This is + * deterministic per attempt per Spark App in order to ensure operator reconciliation idempotency + */ + public static String generateSparkAppId(final SparkApplication app) { + long attemptId = ModelUtils.getAttemptId(app); + String preferredId = String.format("%s-%d", app.getMetadata().getName(), attemptId); + if (preferredId.length() > DEFAULT_ID_LENGTH_LIMIT) { + int preferredIdPrefixLength = + DEFAULT_ID_LENGTH_LIMIT - DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT - 1; + String preferredIdPrefix = preferredId.substring(0, preferredIdPrefixLength); + return generateHashBasedId( + preferredIdPrefix, + app.getMetadata().getNamespace(), + app.getMetadata().getName(), + String.valueOf(attemptId)); + } else { + return preferredId; + } + } + + /** + * Generate a hash-based id with given identifiers. The hash part would have a length-limit of + * `DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT`. In addition, prefix would be applied upon + * generated hash. + * + * @param prefix string prefix to be applied to generated hash-based id. + * @param identifiers keys to generate hash + * @return generated hash-based id + */ + public static String generateHashBasedId(final String prefix, final String... identifiers) { + String sha256Hash = + new BigInteger(1, DigestUtils.sha256(String.join("/", identifiers))) + .toString(DEFAULT_ENCODE_BASE); + String truncatedIdentifiersHash = + sha256Hash.substring(0, DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT); + return String.join("-", prefix, truncatedIdentifiersHash); + } +} diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java new file mode 100644 index 0000000..d19cb05 --- /dev/null +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +import java.util.UUID; + +import scala.Option; + +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; + +class SparkAppDriverConfTest { + @Test + void testResourceNamePrefix() { + // Resource prefix shall be deterministic per SparkApp per attempt + SparkConf sparkConf = new SparkConf(); + sparkConf.set("foo", "bar"); + sparkConf.set("spark.executor.instances", "1"); + String appId = UUID.randomUUID().toString(); + SparkAppDriverConf sparkAppDriverConf = + SparkAppDriverConf.create( + sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty()); + String resourcePrefix = sparkAppDriverConf.resourceNamePrefix(); + assertEquals( + resourcePrefix, + appId, + "Secondary resource prfix should be the same as app id, " + + "but different values are detected"); + assertTrue( + sparkAppDriverConf.configMapNameDriver().contains(resourcePrefix), + "ConfigMap name" + " should include secondary resource prefix"); + assertTrue( + sparkAppDriverConf.driverServiceName().contains(resourcePrefix), + "Driver service " + "name should include secondary resource prefix"); + } + + @Test + void testConfigMapNameDriver() { + SparkConf sparkConf = new SparkConf(); + sparkConf.set("foo", "bar"); + sparkConf.set("spark.executor.instances", "1"); + String appId = RandomStringUtils.randomAlphabetic(1000); + SparkAppDriverConf sparkAppDriverConf = + SparkAppDriverConf.create( + sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, Option.empty()); + String configMapNameDriver = sparkAppDriverConf.configMapNameDriver(); + assertTrue( + configMapNameDriver.length() <= 253, + "config map name length should always comply k8s DNS subdomain length"); + } +} diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java new file mode 100644 index 0000000..929f84e --- /dev/null +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.List; + +import scala.collection.immutable.HashMap; +import scala.collection.immutable.Seq; +import scala.jdk.javaapi.CollectionConverters; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeMount; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.KubernetesDriverSpec; +import org.apache.spark.deploy.k8s.SparkPod; + +class SparkAppResourceSpecTest { + + @Test + void testDriverResourceIncludesConfigMap() { + SparkAppDriverConf mockConf = mock(SparkAppDriverConf.class); + when(mockConf.configMapNameDriver()).thenReturn("foo-configmap"); + when(mockConf.sparkConf()) + .thenReturn(new SparkConf().set("spark.kubernetes.namespace", "foo-namespace")); + + KubernetesDriverSpec mockSpec = mock(KubernetesDriverSpec.class); + Pod driver = buildBasicPod("driver"); + SparkPod sparkPod = new SparkPod(driver, buildBasicContainer()); + + // Add some mock resources and pre-resources + Pod pod1 = buildBasicPod("pod-1"); + Pod pod2 = buildBasicPod("pod-2"); + List<HasMetadata> preResourceList = Collections.singletonList(pod1); + List<HasMetadata> resourceList = Collections.singletonList(pod2); + Seq<HasMetadata> preResourceSeq = CollectionConverters.asScala(preResourceList).toList(); + Seq<HasMetadata> resourceSeq = CollectionConverters.asScala(resourceList).toList(); + when(mockSpec.driverKubernetesResources()).thenReturn(resourceSeq); + when(mockSpec.driverPreKubernetesResources()).thenReturn(preResourceSeq); + when(mockSpec.pod()).thenReturn(sparkPod); + when(mockSpec.systemProperties()).thenReturn(new HashMap<>()); + + SparkAppResourceSpec appResourceSpec = new SparkAppResourceSpec(mockConf, mockSpec); + + Assertions.assertEquals(2, appResourceSpec.getDriverResources().size()); + Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size()); + Assertions.assertEquals(Pod.class, appResourceSpec.getDriverResources().get(0).getClass()); + Assertions.assertEquals( + ConfigMap.class, appResourceSpec.getDriverResources().get(1).getClass()); + Assertions.assertEquals(pod1, appResourceSpec.getDriverPreResources().get(0)); + Assertions.assertEquals(pod2, appResourceSpec.getDriverResources().get(0)); + + ConfigMap proposedConfigMap = (ConfigMap) appResourceSpec.getDriverResources().get(1); + Assertions.assertEquals("foo-configmap", proposedConfigMap.getMetadata().getName()); + Assertions.assertEquals( + "foo-namespace", proposedConfigMap.getData().get("spark.kubernetes.namespace")); + Assertions.assertEquals("foo-namespace", proposedConfigMap.getMetadata().getNamespace()); + + Assertions.assertEquals(2, appResourceSpec.getConfiguredPod().getSpec().getVolumes().size()); + Volume proposedConfigVolume = appResourceSpec.getConfiguredPod().getSpec().getVolumes().get(1); + Assertions.assertEquals("foo-configmap", proposedConfigVolume.getConfigMap().getName()); + + Assertions.assertEquals(2, appResourceSpec.getConfiguredPod().getSpec().getContainers().size()); + Assertions.assertEquals( + 2, + appResourceSpec + .getConfiguredPod() + .getSpec() + .getContainers() + .get(1) + .getVolumeMounts() + .size()); + VolumeMount proposedConfigVolumeMount = + appResourceSpec + .getConfiguredPod() + .getSpec() + .getContainers() + .get(1) + .getVolumeMounts() + .get(1); + Assertions.assertEquals(proposedConfigVolume.getName(), proposedConfigVolumeMount.getName()); + } + + protected Container buildBasicContainer() { + return new ContainerBuilder() + .withName("foo-container") + .addNewVolumeMount() + .withName("placeholder") + .endVolumeMount() + .build(); + } + + protected Pod buildBasicPod(String name) { + return new PodBuilder() + .withNewMetadata() + .withName(name) + .endMetadata() + .withNewSpec() + .addNewContainer() + .withName("placeholder") + .endContainer() + .addNewVolume() + .withName("placeholder") + .endVolume() + .endSpec() + .build(); + } +} diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java new file mode 100644 index 0000000..caceed0 --- /dev/null +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java @@ -0,0 +1,218 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.k8s.operator; + +import static org.apache.spark.k8s.operator.SparkAppSubmissionWorker.DEFAULT_ID_LENGTH_LIMIT; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.when; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; + +import org.apache.spark.SparkConf; +import org.apache.spark.deploy.k8s.submit.JavaMainAppResource; +import org.apache.spark.deploy.k8s.submit.PythonMainAppResource; +import org.apache.spark.deploy.k8s.submit.RMainAppResource; +import org.apache.spark.k8s.operator.spec.ApplicationSpec; +import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary; +import org.apache.spark.k8s.operator.status.ApplicationStatus; +import org.apache.spark.k8s.operator.status.AttemptInfo; + +class SparkAppSubmissionWorkerTest { + @Test + void buildDriverConfShouldApplySpecAndPropertiesOverride() { + Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>(); + try (MockedConstruction<SparkAppDriverConf> mocked = + mockConstruction( + SparkAppDriverConf.class, + (mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments())))) { + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec mockSpec = mock(ApplicationSpec.class); + ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build(); + Map<String, String> appProps = new HashMap<>(); + appProps.put("foo", "bar"); + appProps.put("spark.executor.instances", "1"); + appProps.put("spark.kubernetes.namespace", "ns2"); + Map<String, String> overrides = new HashMap<>(); + overrides.put("spark.executor.instances", "5"); + overrides.put("spark.kubernetes.namespace", "ns3"); + when(mockSpec.getSparkConf()).thenReturn(appProps); + when(mockApp.getSpec()).thenReturn(mockSpec); + when(mockApp.getMetadata()).thenReturn(appMeta); + when(mockSpec.getProxyUser()).thenReturn("foo-user"); + when(mockSpec.getMainClass()).thenReturn("foo-class"); + when(mockSpec.getDriverArgs()).thenReturn(List.of("a", "b")); + + SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); + SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, overrides); + Assertions.assertEquals(6, constructorArgs.get(conf).size()); + + // validate SparkConf with override + Assertions.assertTrue(constructorArgs.get(conf).get(0) instanceof SparkConf); + SparkConf createdConf = (SparkConf) constructorArgs.get(conf).get(0); + Assertions.assertEquals("bar", createdConf.get("foo")); + Assertions.assertEquals("5", createdConf.get("spark.executor.instances")); + + Assertions.assertEquals( + "ns1", + createdConf.get("spark.kubernetes.namespace"), + "namespace from CR takes highest precedence"); + + // validate main resources + Assertions.assertTrue(constructorArgs.get(conf).get(2) instanceof JavaMainAppResource); + JavaMainAppResource mainResource = (JavaMainAppResource) constructorArgs.get(conf).get(2); + Assertions.assertTrue(mainResource.primaryResource().isEmpty()); + + Assertions.assertEquals("foo-class", constructorArgs.get(conf).get(3)); + + Assertions.assertTrue(constructorArgs.get(conf).get(4) instanceof String[]); + String[] capturedArgs = (String[]) constructorArgs.get(conf).get(4); + Assertions.assertEquals(2, capturedArgs.length); + Assertions.assertEquals("a", capturedArgs[0]); + Assertions.assertEquals("b", capturedArgs[1]); + } + } + + @Test + void buildDriverConfForPythonApp() { + Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>(); + try (MockedConstruction<SparkAppDriverConf> mocked = + mockConstruction( + SparkAppDriverConf.class, + (mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments())))) { + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec mockSpec = mock(ApplicationSpec.class); + ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build(); + when(mockApp.getSpec()).thenReturn(mockSpec); + when(mockApp.getMetadata()).thenReturn(appMeta); + when(mockSpec.getPyFiles()).thenReturn("foo"); + + SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); + SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap()); + Assertions.assertEquals(6, constructorArgs.get(conf).size()); + + // validate main resources + Assertions.assertTrue(constructorArgs.get(conf).get(2) instanceof PythonMainAppResource); + PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2); + Assertions.assertEquals("foo", mainResource.primaryResource()); + } + } + + @Test + void buildDriverConfForRApp() { + Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>(); + try (MockedConstruction<SparkAppDriverConf> mocked = + mockConstruction( + SparkAppDriverConf.class, + (mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments())))) { + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec mockSpec = mock(ApplicationSpec.class); + ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build(); + when(mockApp.getSpec()).thenReturn(mockSpec); + when(mockApp.getMetadata()).thenReturn(appMeta); + when(mockSpec.getSparkRFiles()).thenReturn("foo"); + + SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); + SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap()); + Assertions.assertEquals(6, constructorArgs.get(conf).size()); + + // validate main resources + Assertions.assertTrue(constructorArgs.get(conf).get(2) instanceof RMainAppResource); + RMainAppResource mainResource = (RMainAppResource) constructorArgs.get(conf).get(2); + Assertions.assertEquals("foo", mainResource.primaryResource()); + } + } + + @Test + void sparkAppIdShouldBeDeterministicPerAppPerAttempt() { + SparkApplication mockApp1 = mock(SparkApplication.class); + SparkApplication mockApp2 = mock(SparkApplication.class); + ApplicationStatus mockStatus1 = mock(ApplicationStatus.class); + ApplicationStatus mockStatus2 = mock(ApplicationStatus.class); + String appName1 = "app1"; + String appName2 = "app2"; + ObjectMeta appMeta1 = new ObjectMetaBuilder().withName(appName1).withNamespace("ns").build(); + ObjectMeta appMeta2 = new ObjectMetaBuilder().withName(appName2).withNamespace("ns").build(); + when(mockApp1.getMetadata()).thenReturn(appMeta1); + when(mockApp2.getMetadata()).thenReturn(appMeta2); + when(mockApp1.getStatus()).thenReturn(mockStatus1); + when(mockApp2.getStatus()).thenReturn(mockStatus2); + + String appId1 = SparkAppSubmissionWorker.generateSparkAppId(mockApp1); + String appId2 = SparkAppSubmissionWorker.generateSparkAppId(mockApp2); + + Assertions.assertNotEquals(appId1, appId2); + Assertions.assertTrue(appId1.contains(appName1)); + Assertions.assertTrue(appId1.length() <= DEFAULT_ID_LENGTH_LIMIT); + Assertions.assertTrue(appId2.length() <= DEFAULT_ID_LENGTH_LIMIT); + // multiple invoke shall give same result + Assertions.assertEquals( + appId1, + SparkAppSubmissionWorker.generateSparkAppId(mockApp1), + "Multiple invoke of generateSparkAppId shall give same result."); + Assertions.assertEquals( + appId2, + SparkAppSubmissionWorker.generateSparkAppId(mockApp2), + "Multiple invoke of generateSparkAppId shall give same result."); + + ApplicationAttemptSummary mockAttempt = mock(ApplicationAttemptSummary.class); + AttemptInfo mockAttemptInfo = mock(AttemptInfo.class); + when(mockAttempt.getAttemptInfo()).thenReturn(mockAttemptInfo); + when(mockAttemptInfo.getId()).thenReturn(2L); + when(mockStatus1.getCurrentAttemptSummary()).thenReturn(mockAttempt); + when(mockStatus2.getCurrentAttemptSummary()).thenReturn(mockAttempt); + + String appId1Attempt2 = SparkAppSubmissionWorker.generateSparkAppId(mockApp1); + Assertions.assertTrue(appId1Attempt2.contains(appName1)); + Assertions.assertNotEquals(appId1, appId1Attempt2); + Assertions.assertTrue(appId1Attempt2.length() <= DEFAULT_ID_LENGTH_LIMIT); + + String appId2Attempt2 = SparkAppSubmissionWorker.generateSparkAppId(mockApp2); + Assertions.assertNotEquals(appId2, appId2Attempt2); + Assertions.assertEquals(appId2Attempt2, SparkAppSubmissionWorker.generateSparkAppId(mockApp2)); + Assertions.assertTrue(appId2Attempt2.length() <= DEFAULT_ID_LENGTH_LIMIT); + + Assertions.assertEquals(appId1Attempt2, SparkAppSubmissionWorker.generateSparkAppId(mockApp1)); + } + + @Test + void generatedSparkAppIdShouldComplyLengthLimit() { + String namespaceName = RandomStringUtils.randomAlphabetic(253); + String appName = RandomStringUtils.randomAlphabetic(253); + + SparkApplication mockApp = mock(SparkApplication.class); + ObjectMeta appMeta = + new ObjectMetaBuilder().withName(appName).withNamespace(namespaceName).build(); + when(mockApp.getMetadata()).thenReturn(appMeta); + String appId = SparkAppSubmissionWorker.generateSparkAppId(mockApp); + Assertions.assertTrue(appId.length() <= DEFAULT_ID_LENGTH_LIMIT); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org