This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/spark-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new e747bcf  [SPARK-48017] Add Spark application submission worker for 
operator
e747bcf is described below

commit e747bcfab106b828bbd9f2d44968698e5dce3c33
Author: zhou-jiang <zhou_ji...@apple.com>
AuthorDate: Mon May 20 10:41:48 2024 -0700

    [SPARK-48017] Add Spark application submission worker for operator
    
    ### What changes were proposed in this pull request?
    
    This is a breakdown PR of #2  - adding a submission worker implementation 
for SparkApplication.
    
    ### Why are the changes needed?
    
    Spark Operator needs a submission worker to convert its abstraction (the 
SparkApplication API) into k8s resource spec.
    This is a light-weight implementation based on native k8s integration.
    
    As of now, it's based off Spark 4.0.0-preview1 - but it's assumed to serve 
all Spark LTS versions. This is feasible because as it aims to cover only the 
spec generation, Spark core jars are still brought-in by application images. 
E2Es would set up with operator later to ensure that.
    
    Per SPIP doc, in future operator version(s) we may add more implementations 
for submission worker based on different Spark versions to achieve 100% version 
agnostic, at the cost of having multiple workers stand-by.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit test coverage.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    no
    
    Closes #10 from jiangzho/worker.
    
    Authored-by: zhou-jiang <zhou_ji...@apple.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 build.gradle                                       |   4 +
 gradle.properties                                  |   7 +
 settings.gradle                                    |   1 +
 spark-operator-api/build.gradle                    |   1 +
 .../spark/k8s/operator/utils/ModelUtils.java       |   9 +
 spark-submission-worker/build.gradle               |  18 ++
 .../spark/k8s/operator/SparkAppDriverConf.java     |  73 +++++++
 .../spark/k8s/operator/SparkAppResourceSpec.java   | 129 ++++++++++++
 .../k8s/operator/SparkAppSubmissionWorker.java     | 175 +++++++++++++++++
 .../spark/k8s/operator/SparkAppDriverConfTest.java |  75 +++++++
 .../k8s/operator/SparkAppResourceSpecTest.java     | 137 +++++++++++++
 .../k8s/operator/SparkAppSubmissionWorkerTest.java | 218 +++++++++++++++++++++
 12 files changed, 847 insertions(+)

diff --git a/build.gradle b/build.gradle
index a6c1701..c0c75d0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -25,6 +25,10 @@ subprojects {
 
   repositories {
     mavenCentral()
+    // TODO(SPARK-48326) Upgrade submission worker base Spark version to 
4.0.0-preview2
+    maven {
+      url 
"https://repository.apache.org/content/repositories/orgapachespark-1454/";
+    }
   }
 
   apply plugin: 'checkstyle'
diff --git a/gradle.properties b/gradle.properties
index 2606179..ffa8302 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -18,17 +18,24 @@
 group=org.apache.spark.k8s.operator
 version=0.1.0
 
+# Caution: fabric8 version should be aligned with Spark dependency
 fabric8Version=6.12.1
 commonsLang3Version=3.14.0
 commonsIOVersion=2.16.1
 lombokVersion=1.18.32
 
+# Spark
+scalaVersion=2.13
+# TODO(SPARK-48326) Upgrade submission worker base Spark version to 
4.0.0-preview2
+sparkVersion=4.0.0-preview1
+
 # Logging
 log4jVersion=2.22.1
 
 # Test
 junitVersion=5.10.2
 jacocoVersion=0.8.12
+mockitoVersion=5.11.0
 
 # Build Analysis
 checkstyleVersion=10.15.0
diff --git a/settings.gradle b/settings.gradle
index 69e7827..6808ec7 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -1,2 +1,3 @@
 rootProject.name = 'apache-spark-kubernetes-operator'
 include 'spark-operator-api'
+include 'spark-submission-worker'
diff --git a/spark-operator-api/build.gradle b/spark-operator-api/build.gradle
index b57beca..696415f 100644
--- a/spark-operator-api/build.gradle
+++ b/spark-operator-api/build.gradle
@@ -18,6 +18,7 @@ dependencies {
 
   testImplementation platform("org.junit:junit-bom:$junitVersion")
   testImplementation 'org.junit.jupiter:junit-jupiter'
+  testRuntimeOnly "org.junit.platform:junit-platform-launcher"
 }
 
 test {
diff --git 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java
 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java
index 454e706..03d84be 100644
--- 
a/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java
+++ 
b/spark-operator-api/src/main/java/org/apache/spark/k8s/operator/utils/ModelUtils.java
@@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.PodBuilder;
 import io.fabric8.kubernetes.api.model.PodTemplateSpec;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.spark.k8s.operator.SparkApplication;
 import org.apache.spark.k8s.operator.spec.ApplicationSpec;
 
 public class ModelUtils {
@@ -107,4 +108,12 @@ public class ModelUtils {
         && applicationSpec.getExecutorSpec() != null
         && applicationSpec.getExecutorSpec().getPodTemplateSpec() != null;
   }
+
+  public static long getAttemptId(final SparkApplication app) {
+    long attemptId = 0L;
+    if (app.getStatus() != null && app.getStatus().getCurrentAttemptSummary() 
!= null) {
+      attemptId = 
app.getStatus().getCurrentAttemptSummary().getAttemptInfo().getId();
+    }
+    return attemptId;
+  }
 }
diff --git a/spark-submission-worker/build.gradle 
b/spark-submission-worker/build.gradle
new file mode 100644
index 0000000..da78026
--- /dev/null
+++ b/spark-submission-worker/build.gradle
@@ -0,0 +1,18 @@
+dependencies {
+  implementation project(":spark-operator-api")
+
+  
implementation("org.apache.spark:spark-kubernetes_$scalaVersion:$sparkVersion")
+
+  compileOnly("org.projectlombok:lombok:$lombokVersion")
+  annotationProcessor("org.projectlombok:lombok:$lombokVersion")
+
+  testImplementation platform("org.junit:junit-bom:$junitVersion")
+  testImplementation "org.mockito:mockito-core:$mockitoVersion"
+  testImplementation "org.junit.jupiter:junit-jupiter:$junitVersion"
+  testImplementation "io.fabric8:kubernetes-server-mock:$fabric8Version"
+  testRuntimeOnly "org.junit.platform:junit-platform-launcher"
+}
+
+test {
+  useJUnitPlatform()
+}
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
new file mode 100644
index 0000000..f2870a2
--- /dev/null
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppDriverConf.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import scala.Option;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.Config;
+import org.apache.spark.deploy.k8s.KubernetesDriverConf;
+import org.apache.spark.deploy.k8s.KubernetesVolumeUtils;
+import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+
+public class SparkAppDriverConf extends KubernetesDriverConf {
+  private SparkAppDriverConf(
+      SparkConf sparkConf,
+      String appId,
+      MainAppResource mainAppResource,
+      String mainClass,
+      String[] appArgs,
+      Option<String> proxyUser) {
+    super(sparkConf, appId, mainAppResource, mainClass, appArgs, proxyUser, 
null);
+  }
+
+  public static SparkAppDriverConf create(
+      SparkConf sparkConf,
+      String appId,
+      MainAppResource mainAppResource,
+      String mainClass,
+      String[] appArgs,
+      Option<String> proxyUser) {
+    // pre-create check only
+    KubernetesVolumeUtils.parseVolumesWithPrefix(
+        sparkConf, Config.KUBERNETES_EXECUTOR_VOLUMES_PREFIX());
+    return new SparkAppDriverConf(sparkConf, appId, mainAppResource, 
mainClass, appArgs, proxyUser);
+  }
+
+  /** Application managed by operator has a deterministic prefix */
+  @Override
+  public String resourceNamePrefix() {
+    return appId();
+  }
+
+  /**
+   * Create the name to be used by driver config map. The consists of 
`resourceNamePrefix` and Spark
+   * instance type (driver). Operator proposes `resourceNamePrefix` with 
leaves naming length margin
+   * for sub-resources to be qualified as DNS subdomain or label. In addition, 
the overall config
+   * name length is governed by `KubernetesClientUtils.configMapName` - which 
ensures the name
+   * length meets requirements as DNS subdomain name.
+   *
+   * @return proposed name to be used by driver config map
+   */
+  public String configMapNameDriver() {
+    return KubernetesClientUtils.configMapName(String.format("%s-spark-drv", 
resourceNamePrefix()));
+  }
+}
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
new file mode 100644
index 0000000..101b29a
--- /dev/null
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppResourceSpec.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import scala.Tuple2;
+import scala.collection.immutable.HashMap;
+import scala.collection.immutable.Map;
+import scala.jdk.CollectionConverters;
+
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import lombok.Getter;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.deploy.k8s.Config;
+import org.apache.spark.deploy.k8s.Constants;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.SparkPod;
+import org.apache.spark.deploy.k8s.submit.KubernetesClientUtils;
+
+/**
+ * Resembles resources that would be directly launched by operator. Based on 
resolved
+ * org.apache.spark.deploy.k8s.KubernetesDriverSpec, it:
+ *
+ * <ul>
+ *   <li>Add ConfigMap as a resource for driver
+ *   <li>Converts scala types to Java for easier reference from operator
+ * </ul>
+ *
+ * <p>This is not thread safe and not expected to be shared among reconciler 
threads
+ */
+public class SparkAppResourceSpec {
+  @Getter private final Pod configuredPod;
+  @Getter private final List<HasMetadata> driverPreResources;
+  @Getter private final List<HasMetadata> driverResources;
+  private final SparkAppDriverConf kubernetesDriverConf;
+
+  public SparkAppResourceSpec(
+      SparkAppDriverConf kubernetesDriverConf, KubernetesDriverSpec 
kubernetesDriverSpec) {
+    this.kubernetesDriverConf = kubernetesDriverConf;
+    String namespace = 
kubernetesDriverConf.sparkConf().get(Config.KUBERNETES_NAMESPACE().key());
+    Map<String, String> confFilesMap =
+        KubernetesClientUtils.buildSparkConfDirFilesMap(
+                kubernetesDriverConf.configMapNameDriver(),
+                kubernetesDriverConf.sparkConf(),
+                kubernetesDriverSpec.systemProperties())
+            .$plus(new Tuple2<>(Config.KUBERNETES_NAMESPACE().key(), 
namespace));
+    SparkPod sparkPod = addConfigMap(kubernetesDriverSpec.pod(), confFilesMap);
+    this.configuredPod =
+        new PodBuilder(sparkPod.pod())
+            .editSpec()
+            .addToContainers(sparkPod.container())
+            .endSpec()
+            .build();
+    this.driverPreResources =
+        new ArrayList<>(
+            
CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverPreKubernetesResources())
+                .asJava());
+    this.driverResources =
+        new ArrayList<>(
+            
CollectionConverters.SeqHasAsJava(kubernetesDriverSpec.driverKubernetesResources())
+                .asJava());
+    this.driverResources.add(
+        KubernetesClientUtils.buildConfigMap(
+            kubernetesDriverConf.configMapNameDriver(), confFilesMap, new 
HashMap<>()));
+    this.driverPreResources.forEach(r -> setNamespaceIfMissing(r, namespace));
+    this.driverResources.forEach(r -> setNamespaceIfMissing(r, namespace));
+  }
+
+  private void setNamespaceIfMissing(HasMetadata resource, String namespace) {
+    if (StringUtils.isNotEmpty(resource.getMetadata().getNamespace())) {
+      return;
+    }
+    resource.getMetadata().setNamespace(namespace);
+  }
+
+  private SparkPod addConfigMap(SparkPod pod, Map<String, String> 
confFilesMap) {
+    Container containerWithConfigMapVolume =
+        new ContainerBuilder(pod.container())
+            .addNewEnv()
+            .withName(Constants.ENV_SPARK_CONF_DIR())
+            .withValue(Constants.SPARK_CONF_DIR_INTERNAL())
+            .endEnv()
+            .addNewVolumeMount()
+            .withName(Constants.SPARK_CONF_VOLUME_DRIVER())
+            .withMountPath(Constants.SPARK_CONF_DIR_INTERNAL())
+            .endVolumeMount()
+            .build();
+    Pod podWithConfigMapVolume =
+        new PodBuilder(pod.pod())
+            .editSpec()
+            .addNewVolume()
+            .withName(Constants.SPARK_CONF_VOLUME_DRIVER())
+            .withNewConfigMap()
+            .withItems(
+                CollectionConverters.SeqHasAsJava(
+                        
KubernetesClientUtils.buildKeyToPathObjects(confFilesMap))
+                    .asJava())
+            .withName(kubernetesDriverConf.configMapNameDriver())
+            .endConfigMap()
+            .endVolume()
+            .endSpec()
+            .build();
+    return new SparkPod(podWithConfigMapVolume, containerWithConfigMapVolume);
+  }
+}
diff --git 
a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
new file mode 100644
index 0000000..57013cb
--- /dev/null
+++ 
b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import java.math.BigInteger;
+import java.util.Map;
+
+import scala.Option;
+
+import io.fabric8.kubernetes.client.KubernetesClient;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverBuilder;
+import org.apache.spark.deploy.k8s.submit.MainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.utils.ModelUtils;
+
+/**
+ * Similar to org.apache.spark.deploy.k8s.submit.KubernetesClientApplication. 
This reads args from
+ * SparkApplication instead of starting separate spark-submit process
+ */
+public class SparkAppSubmissionWorker {
+  // Default length limit for generated app id. Generated id is used as 
resource-prefix when
+  // user-provided id is too long for this purpose. This applied to all 
resources associated with
+  // the Spark app (including k8s service which has different naming length 
limit). Thus, we
+  // truncate the hash part to 46 chars to leave some margin for spark 
resource prefix and suffix
+  // (e.g. 'spark-', '-driver-svc' . etc)
+  public static final int DEFAULT_ID_LENGTH_LIMIT = 46;
+  // Default length limit to be applied to the hash-based part of generated id
+  public static final int DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT = 36;
+  // Radix value used when generating hash-based identifier
+  public static final int DEFAULT_ENCODE_BASE = 36;
+  public static final String DEFAULT_MASTER_URL_PREFIX = "k8s://";
+  public static final String MASTER_URL_PREFIX_PROPS_NAME = 
"spark.master.url.prefix";
+
+  /**
+   * Build secondary resource spec for given app with Spark developer API, 
with defaults / overrides
+   * as:
+   *
+   * <ul>
+   *   <li>spark.kubernetes.namespace - if provided and the provided value is 
different from the
+   *       namespace that the SparkApp resides in, it would be force override 
to the same value as
+   *       the SparkApp custom resource.
+   *   <li>spark.jars - if not provided, this would be set to the value in 
.spec.jars
+   *   <li>spark.submit.pyFiles - if not provided, this would be set to the 
value in .spec.PyFiles
+   *   <li>spark.master - if not provided, this would be automatically built 
with
+   *       KUBERNETES_SERVICE_HOST and KUBERNETES_SERVICE_PORT, and prefix 
'k8s://' for native Spark
+   *       ExternalClusterManager. It is possible to invoke custom 
ClusterManager: to do so, either
+   *       set 'spark.master', or set `spark.master.url.prefix` so submission 
worker can build
+   *       master url based on it.
+   * </ul>
+   *
+   * @param app the SparkApp resource
+   * @param client k8s client
+   * @param confOverrides key-value pairs of conf overrides for the given app.
+   * @return secondary resources spec as `SparkAppResourceSpec`
+   */
+  public SparkAppResourceSpec getResourceSpec(
+      SparkApplication app, KubernetesClient client, Map<String, String> 
confOverrides) {
+    SparkAppDriverConf appDriverConf = buildDriverConf(app, confOverrides);
+    return buildResourceSpec(appDriverConf, client);
+  }
+
+  protected SparkAppDriverConf buildDriverConf(
+      SparkApplication app, Map<String, String> confOverrides) {
+    ApplicationSpec applicationSpec = app.getSpec();
+    SparkConf effectiveSparkConf = new SparkConf();
+    if (MapUtils.isNotEmpty(applicationSpec.getSparkConf())) {
+      for (String confKey : applicationSpec.getSparkConf().keySet()) {
+        effectiveSparkConf.set(confKey, 
applicationSpec.getSparkConf().get(confKey));
+      }
+    }
+    if (MapUtils.isNotEmpty(confOverrides)) {
+      for (Map.Entry<String, String> entry : confOverrides.entrySet()) {
+        effectiveSparkConf.set(entry.getKey(), entry.getValue());
+      }
+    }
+    effectiveSparkConf.set("spark.kubernetes.namespace", 
app.getMetadata().getNamespace());
+    MainAppResource primaryResource = new JavaMainAppResource(Option.empty());
+    if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
+      primaryResource = new 
JavaMainAppResource(Option.apply(applicationSpec.getJars()));
+      effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
+    } else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
+      primaryResource = new 
PythonMainAppResource(applicationSpec.getPyFiles());
+      effectiveSparkConf.setIfMissing("spark.submit.pyFiles", 
applicationSpec.getPyFiles());
+    } else if (StringUtils.isNotEmpty(applicationSpec.getSparkRFiles())) {
+      primaryResource = new RMainAppResource(applicationSpec.getSparkRFiles());
+    }
+    String sparkMasterUrlPrefix =
+        effectiveSparkConf.get(MASTER_URL_PREFIX_PROPS_NAME, 
DEFAULT_MASTER_URL_PREFIX);
+    effectiveSparkConf.setIfMissing(
+        "spark.master",
+        sparkMasterUrlPrefix + 
"https://$KUBERNETES_SERVICE_HOST:$KUBERNETES_SERVICE_PORT";);
+    String appId = generateSparkAppId(app);
+    effectiveSparkConf.setIfMissing("spark.app.id", appId);
+    return SparkAppDriverConf.create(
+        effectiveSparkConf,
+        appId,
+        primaryResource,
+        applicationSpec.getMainClass(),
+        applicationSpec.getDriverArgs().toArray(String[]::new),
+        Option.apply(applicationSpec.getProxyUser()));
+  }
+
+  protected SparkAppResourceSpec buildResourceSpec(
+      SparkAppDriverConf kubernetesDriverConf, KubernetesClient client) {
+    KubernetesDriverBuilder builder = new KubernetesDriverBuilder();
+    KubernetesDriverSpec kubernetesDriverSpec =
+        builder.buildFromFeatures(kubernetesDriverConf, client);
+    return new SparkAppResourceSpec(kubernetesDriverConf, 
kubernetesDriverSpec);
+  }
+
+  /**
+   * Generate a Spark application id. Similar to 
`KubernetesConf.getKubernetesAppId()`. This is
+   * deterministic per attempt per Spark App in order to ensure operator 
reconciliation idempotency
+   */
+  public static String generateSparkAppId(final SparkApplication app) {
+    long attemptId = ModelUtils.getAttemptId(app);
+    String preferredId = String.format("%s-%d", app.getMetadata().getName(), 
attemptId);
+    if (preferredId.length() > DEFAULT_ID_LENGTH_LIMIT) {
+      int preferredIdPrefixLength =
+          DEFAULT_ID_LENGTH_LIMIT - DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT 
- 1;
+      String preferredIdPrefix = preferredId.substring(0, 
preferredIdPrefixLength);
+      return generateHashBasedId(
+          preferredIdPrefix,
+          app.getMetadata().getNamespace(),
+          app.getMetadata().getName(),
+          String.valueOf(attemptId));
+    } else {
+      return preferredId;
+    }
+  }
+
+  /**
+   * Generate a hash-based id with given identifiers. The hash part would have 
a length-limit of
+   * `DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT`. In addition, prefix would 
be applied upon
+   * generated hash.
+   *
+   * @param prefix string prefix to be applied to generated hash-based id.
+   * @param identifiers keys to generate hash
+   * @return generated hash-based id
+   */
+  public static String generateHashBasedId(final String prefix, final 
String... identifiers) {
+    String sha256Hash =
+        new BigInteger(1, DigestUtils.sha256(String.join("/", identifiers)))
+            .toString(DEFAULT_ENCODE_BASE);
+    String truncatedIdentifiersHash =
+        sha256Hash.substring(0, DEFAULT_HASH_BASED_IDENTIFIER_LENGTH_LIMIT);
+    return String.join("-", prefix, truncatedIdentifiersHash);
+  }
+}
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
new file mode 100644
index 0000000..d19cb05
--- /dev/null
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppDriverConfTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+import java.util.UUID;
+
+import scala.Option;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+
+class SparkAppDriverConfTest {
+  @Test
+  void testResourceNamePrefix() {
+    // Resource prefix shall be deterministic per SparkApp per attempt
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.set("foo", "bar");
+    sparkConf.set("spark.executor.instances", "1");
+    String appId = UUID.randomUUID().toString();
+    SparkAppDriverConf sparkAppDriverConf =
+        SparkAppDriverConf.create(
+            sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, 
Option.empty());
+    String resourcePrefix = sparkAppDriverConf.resourceNamePrefix();
+    assertEquals(
+        resourcePrefix,
+        appId,
+        "Secondary resource prfix should be the same as app id, "
+            + "but different values are detected");
+    assertTrue(
+        sparkAppDriverConf.configMapNameDriver().contains(resourcePrefix),
+        "ConfigMap name" + " should include secondary resource prefix");
+    assertTrue(
+        sparkAppDriverConf.driverServiceName().contains(resourcePrefix),
+        "Driver service " + "name should include secondary resource prefix");
+  }
+
+  @Test
+  void testConfigMapNameDriver() {
+    SparkConf sparkConf = new SparkConf();
+    sparkConf.set("foo", "bar");
+    sparkConf.set("spark.executor.instances", "1");
+    String appId = RandomStringUtils.randomAlphabetic(1000);
+    SparkAppDriverConf sparkAppDriverConf =
+        SparkAppDriverConf.create(
+            sparkConf, appId, mock(JavaMainAppResource.class), "foo", null, 
Option.empty());
+    String configMapNameDriver = sparkAppDriverConf.configMapNameDriver();
+    assertTrue(
+        configMapNameDriver.length() <= 253,
+        "config map name length should always comply k8s DNS subdomain 
length");
+  }
+}
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
new file mode 100644
index 0000000..929f84e
--- /dev/null
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppResourceSpecTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+
+import scala.collection.immutable.HashMap;
+import scala.collection.immutable.Seq;
+import scala.jdk.javaapi.CollectionConverters;
+
+import io.fabric8.kubernetes.api.model.ConfigMap;
+import io.fabric8.kubernetes.api.model.Container;
+import io.fabric8.kubernetes.api.model.ContainerBuilder;
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.api.model.Pod;
+import io.fabric8.kubernetes.api.model.PodBuilder;
+import io.fabric8.kubernetes.api.model.Volume;
+import io.fabric8.kubernetes.api.model.VolumeMount;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.KubernetesDriverSpec;
+import org.apache.spark.deploy.k8s.SparkPod;
+
+class SparkAppResourceSpecTest {
+
+  @Test
+  void testDriverResourceIncludesConfigMap() {
+    SparkAppDriverConf mockConf = mock(SparkAppDriverConf.class);
+    when(mockConf.configMapNameDriver()).thenReturn("foo-configmap");
+    when(mockConf.sparkConf())
+        .thenReturn(new SparkConf().set("spark.kubernetes.namespace", 
"foo-namespace"));
+
+    KubernetesDriverSpec mockSpec = mock(KubernetesDriverSpec.class);
+    Pod driver = buildBasicPod("driver");
+    SparkPod sparkPod = new SparkPod(driver, buildBasicContainer());
+
+    // Add some mock resources and pre-resources
+    Pod pod1 = buildBasicPod("pod-1");
+    Pod pod2 = buildBasicPod("pod-2");
+    List<HasMetadata> preResourceList = Collections.singletonList(pod1);
+    List<HasMetadata> resourceList = Collections.singletonList(pod2);
+    Seq<HasMetadata> preResourceSeq = 
CollectionConverters.asScala(preResourceList).toList();
+    Seq<HasMetadata> resourceSeq = 
CollectionConverters.asScala(resourceList).toList();
+    when(mockSpec.driverKubernetesResources()).thenReturn(resourceSeq);
+    when(mockSpec.driverPreKubernetesResources()).thenReturn(preResourceSeq);
+    when(mockSpec.pod()).thenReturn(sparkPod);
+    when(mockSpec.systemProperties()).thenReturn(new HashMap<>());
+
+    SparkAppResourceSpec appResourceSpec = new SparkAppResourceSpec(mockConf, 
mockSpec);
+
+    Assertions.assertEquals(2, appResourceSpec.getDriverResources().size());
+    Assertions.assertEquals(1, appResourceSpec.getDriverPreResources().size());
+    Assertions.assertEquals(Pod.class, 
appResourceSpec.getDriverResources().get(0).getClass());
+    Assertions.assertEquals(
+        ConfigMap.class, 
appResourceSpec.getDriverResources().get(1).getClass());
+    Assertions.assertEquals(pod1, 
appResourceSpec.getDriverPreResources().get(0));
+    Assertions.assertEquals(pod2, appResourceSpec.getDriverResources().get(0));
+
+    ConfigMap proposedConfigMap = (ConfigMap) 
appResourceSpec.getDriverResources().get(1);
+    Assertions.assertEquals("foo-configmap", 
proposedConfigMap.getMetadata().getName());
+    Assertions.assertEquals(
+        "foo-namespace", 
proposedConfigMap.getData().get("spark.kubernetes.namespace"));
+    Assertions.assertEquals("foo-namespace", 
proposedConfigMap.getMetadata().getNamespace());
+
+    Assertions.assertEquals(2, 
appResourceSpec.getConfiguredPod().getSpec().getVolumes().size());
+    Volume proposedConfigVolume = 
appResourceSpec.getConfiguredPod().getSpec().getVolumes().get(1);
+    Assertions.assertEquals("foo-configmap", 
proposedConfigVolume.getConfigMap().getName());
+
+    Assertions.assertEquals(2, 
appResourceSpec.getConfiguredPod().getSpec().getContainers().size());
+    Assertions.assertEquals(
+        2,
+        appResourceSpec
+            .getConfiguredPod()
+            .getSpec()
+            .getContainers()
+            .get(1)
+            .getVolumeMounts()
+            .size());
+    VolumeMount proposedConfigVolumeMount =
+        appResourceSpec
+            .getConfiguredPod()
+            .getSpec()
+            .getContainers()
+            .get(1)
+            .getVolumeMounts()
+            .get(1);
+    Assertions.assertEquals(proposedConfigVolume.getName(), 
proposedConfigVolumeMount.getName());
+  }
+
+  protected Container buildBasicContainer() {
+    return new ContainerBuilder()
+        .withName("foo-container")
+        .addNewVolumeMount()
+        .withName("placeholder")
+        .endVolumeMount()
+        .build();
+  }
+
+  protected Pod buildBasicPod(String name) {
+    return new PodBuilder()
+        .withNewMetadata()
+        .withName(name)
+        .endMetadata()
+        .withNewSpec()
+        .addNewContainer()
+        .withName("placeholder")
+        .endContainer()
+        .addNewVolume()
+        .withName("placeholder")
+        .endVolume()
+        .endSpec()
+        .build();
+  }
+}
diff --git 
a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
new file mode 100644
index 0000000..caceed0
--- /dev/null
+++ 
b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.k8s.operator;
+
+import static 
org.apache.spark.k8s.operator.SparkAppSubmissionWorker.DEFAULT_ID_LENGTH_LIMIT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.ObjectMeta;
+import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedConstruction;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.deploy.k8s.submit.JavaMainAppResource;
+import org.apache.spark.deploy.k8s.submit.PythonMainAppResource;
+import org.apache.spark.deploy.k8s.submit.RMainAppResource;
+import org.apache.spark.k8s.operator.spec.ApplicationSpec;
+import org.apache.spark.k8s.operator.status.ApplicationAttemptSummary;
+import org.apache.spark.k8s.operator.status.ApplicationStatus;
+import org.apache.spark.k8s.operator.status.AttemptInfo;
+
+class SparkAppSubmissionWorkerTest {
+  @Test
+  void buildDriverConfShouldApplySpecAndPropertiesOverride() {
+    Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
+    try (MockedConstruction<SparkAppDriverConf> mocked =
+        mockConstruction(
+            SparkAppDriverConf.class,
+            (mock, context) -> constructorArgs.put(mock, new 
ArrayList<>(context.arguments())))) {
+      SparkApplication mockApp = mock(SparkApplication.class);
+      ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+      ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+      Map<String, String> appProps = new HashMap<>();
+      appProps.put("foo", "bar");
+      appProps.put("spark.executor.instances", "1");
+      appProps.put("spark.kubernetes.namespace", "ns2");
+      Map<String, String> overrides = new HashMap<>();
+      overrides.put("spark.executor.instances", "5");
+      overrides.put("spark.kubernetes.namespace", "ns3");
+      when(mockSpec.getSparkConf()).thenReturn(appProps);
+      when(mockApp.getSpec()).thenReturn(mockSpec);
+      when(mockApp.getMetadata()).thenReturn(appMeta);
+      when(mockSpec.getProxyUser()).thenReturn("foo-user");
+      when(mockSpec.getMainClass()).thenReturn("foo-class");
+      when(mockSpec.getDriverArgs()).thenReturn(List.of("a", "b"));
+
+      SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
+      SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
overrides);
+      Assertions.assertEquals(6, constructorArgs.get(conf).size());
+
+      // validate SparkConf with override
+      Assertions.assertTrue(constructorArgs.get(conf).get(0) instanceof 
SparkConf);
+      SparkConf createdConf = (SparkConf) constructorArgs.get(conf).get(0);
+      Assertions.assertEquals("bar", createdConf.get("foo"));
+      Assertions.assertEquals("5", 
createdConf.get("spark.executor.instances"));
+
+      Assertions.assertEquals(
+          "ns1",
+          createdConf.get("spark.kubernetes.namespace"),
+          "namespace from CR takes highest precedence");
+
+      // validate main resources
+      Assertions.assertTrue(constructorArgs.get(conf).get(2) instanceof 
JavaMainAppResource);
+      JavaMainAppResource mainResource = (JavaMainAppResource) 
constructorArgs.get(conf).get(2);
+      Assertions.assertTrue(mainResource.primaryResource().isEmpty());
+
+      Assertions.assertEquals("foo-class", constructorArgs.get(conf).get(3));
+
+      Assertions.assertTrue(constructorArgs.get(conf).get(4) instanceof 
String[]);
+      String[] capturedArgs = (String[]) constructorArgs.get(conf).get(4);
+      Assertions.assertEquals(2, capturedArgs.length);
+      Assertions.assertEquals("a", capturedArgs[0]);
+      Assertions.assertEquals("b", capturedArgs[1]);
+    }
+  }
+
+  @Test
+  void buildDriverConfForPythonApp() {
+    Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
+    try (MockedConstruction<SparkAppDriverConf> mocked =
+        mockConstruction(
+            SparkAppDriverConf.class,
+            (mock, context) -> constructorArgs.put(mock, new 
ArrayList<>(context.arguments())))) {
+      SparkApplication mockApp = mock(SparkApplication.class);
+      ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+      ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+      when(mockApp.getSpec()).thenReturn(mockSpec);
+      when(mockApp.getMetadata()).thenReturn(appMeta);
+      when(mockSpec.getPyFiles()).thenReturn("foo");
+
+      SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
+      SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
+      Assertions.assertEquals(6, constructorArgs.get(conf).size());
+
+      // validate main resources
+      Assertions.assertTrue(constructorArgs.get(conf).get(2) instanceof 
PythonMainAppResource);
+      PythonMainAppResource mainResource = (PythonMainAppResource) 
constructorArgs.get(conf).get(2);
+      Assertions.assertEquals("foo", mainResource.primaryResource());
+    }
+  }
+
+  @Test
+  void buildDriverConfForRApp() {
+    Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
+    try (MockedConstruction<SparkAppDriverConf> mocked =
+        mockConstruction(
+            SparkAppDriverConf.class,
+            (mock, context) -> constructorArgs.put(mock, new 
ArrayList<>(context.arguments())))) {
+      SparkApplication mockApp = mock(SparkApplication.class);
+      ApplicationSpec mockSpec = mock(ApplicationSpec.class);
+      ObjectMeta appMeta = new 
ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
+      when(mockApp.getSpec()).thenReturn(mockSpec);
+      when(mockApp.getMetadata()).thenReturn(appMeta);
+      when(mockSpec.getSparkRFiles()).thenReturn("foo");
+
+      SparkAppSubmissionWorker submissionWorker = new 
SparkAppSubmissionWorker();
+      SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, 
Collections.emptyMap());
+      Assertions.assertEquals(6, constructorArgs.get(conf).size());
+
+      // validate main resources
+      Assertions.assertTrue(constructorArgs.get(conf).get(2) instanceof 
RMainAppResource);
+      RMainAppResource mainResource = (RMainAppResource) 
constructorArgs.get(conf).get(2);
+      Assertions.assertEquals("foo", mainResource.primaryResource());
+    }
+  }
+
+  @Test
+  void sparkAppIdShouldBeDeterministicPerAppPerAttempt() {
+    SparkApplication mockApp1 = mock(SparkApplication.class);
+    SparkApplication mockApp2 = mock(SparkApplication.class);
+    ApplicationStatus mockStatus1 = mock(ApplicationStatus.class);
+    ApplicationStatus mockStatus2 = mock(ApplicationStatus.class);
+    String appName1 = "app1";
+    String appName2 = "app2";
+    ObjectMeta appMeta1 = new 
ObjectMetaBuilder().withName(appName1).withNamespace("ns").build();
+    ObjectMeta appMeta2 = new 
ObjectMetaBuilder().withName(appName2).withNamespace("ns").build();
+    when(mockApp1.getMetadata()).thenReturn(appMeta1);
+    when(mockApp2.getMetadata()).thenReturn(appMeta2);
+    when(mockApp1.getStatus()).thenReturn(mockStatus1);
+    when(mockApp2.getStatus()).thenReturn(mockStatus2);
+
+    String appId1 = SparkAppSubmissionWorker.generateSparkAppId(mockApp1);
+    String appId2 = SparkAppSubmissionWorker.generateSparkAppId(mockApp2);
+
+    Assertions.assertNotEquals(appId1, appId2);
+    Assertions.assertTrue(appId1.contains(appName1));
+    Assertions.assertTrue(appId1.length() <= DEFAULT_ID_LENGTH_LIMIT);
+    Assertions.assertTrue(appId2.length() <= DEFAULT_ID_LENGTH_LIMIT);
+    // multiple invoke shall give same result
+    Assertions.assertEquals(
+        appId1,
+        SparkAppSubmissionWorker.generateSparkAppId(mockApp1),
+        "Multiple invoke of generateSparkAppId shall give same result.");
+    Assertions.assertEquals(
+        appId2,
+        SparkAppSubmissionWorker.generateSparkAppId(mockApp2),
+        "Multiple invoke of generateSparkAppId shall give same result.");
+
+    ApplicationAttemptSummary mockAttempt = 
mock(ApplicationAttemptSummary.class);
+    AttemptInfo mockAttemptInfo = mock(AttemptInfo.class);
+    when(mockAttempt.getAttemptInfo()).thenReturn(mockAttemptInfo);
+    when(mockAttemptInfo.getId()).thenReturn(2L);
+    when(mockStatus1.getCurrentAttemptSummary()).thenReturn(mockAttempt);
+    when(mockStatus2.getCurrentAttemptSummary()).thenReturn(mockAttempt);
+
+    String appId1Attempt2 = 
SparkAppSubmissionWorker.generateSparkAppId(mockApp1);
+    Assertions.assertTrue(appId1Attempt2.contains(appName1));
+    Assertions.assertNotEquals(appId1, appId1Attempt2);
+    Assertions.assertTrue(appId1Attempt2.length() <= DEFAULT_ID_LENGTH_LIMIT);
+
+    String appId2Attempt2 = 
SparkAppSubmissionWorker.generateSparkAppId(mockApp2);
+    Assertions.assertNotEquals(appId2, appId2Attempt2);
+    Assertions.assertEquals(appId2Attempt2, 
SparkAppSubmissionWorker.generateSparkAppId(mockApp2));
+    Assertions.assertTrue(appId2Attempt2.length() <= DEFAULT_ID_LENGTH_LIMIT);
+
+    Assertions.assertEquals(appId1Attempt2, 
SparkAppSubmissionWorker.generateSparkAppId(mockApp1));
+  }
+
+  @Test
+  void generatedSparkAppIdShouldComplyLengthLimit() {
+    String namespaceName = RandomStringUtils.randomAlphabetic(253);
+    String appName = RandomStringUtils.randomAlphabetic(253);
+
+    SparkApplication mockApp = mock(SparkApplication.class);
+    ObjectMeta appMeta =
+        new 
ObjectMetaBuilder().withName(appName).withNamespace(namespaceName).build();
+    when(mockApp.getMetadata()).thenReturn(appMeta);
+    String appId = SparkAppSubmissionWorker.generateSparkAppId(mockApp);
+    Assertions.assertTrue(appId.length() <= DEFAULT_ID_LENGTH_LIMIT);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to