This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb0eb699f7c [SPARK-48038][K8S] Promote driverServiceName to 
KubernetesDriverConf
ccb0eb699f7c is described below

commit ccb0eb699f7c54aa3902d1ebbb34684693b563de
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Mon Apr 29 08:35:13 2024 -0700

    [SPARK-48038][K8S] Promote driverServiceName to KubernetesDriverConf
    
    ### What changes were proposed in this pull request?
    
    Promote `driverServiceName` from `DriverServiceFeatureStep` to 
`KubernetesDriverConf`.
    
    ### Why are the changes needed?
    
    To allow other feature steps, e.g. ingress(proposed in SPARK-47954), to 
access `driverServiceName`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    UT has been updated.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46276 from pan3793/SPARK-48038.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/deploy/k8s/KubernetesConf.scala   | 22 +++++++++++++++++++---
 .../k8s/features/DriverServiceFeatureStep.scala    | 14 ++------------
 .../spark/deploy/k8s/KubernetesTestConf.scala      |  6 ++++--
 .../features/DriverServiceFeatureStepSuite.scala   | 17 +++++++++--------
 4 files changed, 34 insertions(+), 25 deletions(-)

diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
index b55f9317d10b..fda772b737fe 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala
@@ -24,12 +24,13 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.spark.{SPARK_VERSION, SparkConf}
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._
 import org.apache.spark.deploy.k8s.submit._
 import org.apache.spark.internal.{Logging, MDC}
 import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX}
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
  * Structure containing metadata for Kubernetes logic to build Spark pods.
@@ -83,12 +84,27 @@ private[spark] class KubernetesDriverConf(
     val mainAppResource: MainAppResource,
     val mainClass: String,
     val appArgs: Array[String],
-    val proxyUser: Option[String])
-  extends KubernetesConf(sparkConf) {
+    val proxyUser: Option[String],
+    clock: Clock = new SystemClock())
+  extends KubernetesConf(sparkConf) with Logging {
 
   def driverNodeSelector: Map[String, String] =
     KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, 
KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX)
 
+  lazy val driverServiceName: String = {
+    val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX"
+    if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) {
+      preferredServiceName
+    } else {
+      val randomServiceId = KubernetesUtils.uniqueID(clock)
+      val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+      logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+        s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling 
back to use " +
+        s"$shorterServiceName as the driver service's name.")
+      shorterServiceName
+    }
+  }
+
   override val resourceNamePrefix: String = {
     val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) 
else None
     custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
index cba4f442371c..9adfb2b8de49 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala
@@ -20,7 +20,7 @@ import scala.jdk.CollectionConverters._
 
 import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder}
 
-import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, 
SparkPod}
+import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod}
 import 
org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, 
KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, 
KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY}
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.{config, Logging}
@@ -39,17 +39,7 @@ private[spark] class DriverServiceFeatureStep(
     s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's 
hostname will be " +
       "managed via a Kubernetes service.")
 
-  private val preferredServiceName = 
s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX"
-  private val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
-    preferredServiceName
-  } else {
-    val randomServiceId = KubernetesUtils.uniqueID(clock = clock)
-    val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
-    logWarning(s"Driver's hostname would preferably be $preferredServiceName, 
but this is " +
-      s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling 
back to use " +
-      s"$shorterServiceName as the driver service's name.")
-    shorterServiceName
-  }
+  private val resolvedServiceName = kubernetesConf.driverServiceName
   private val ipFamilyPolicy =
     kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY)
   private val ipFamilies =
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
index d6a60b1edea2..b70b9348d23b 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala
@@ -22,6 +22,7 @@ import io.fabric8.kubernetes.api.model.Pod
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, 
MainAppResource}
+import org.apache.spark.util.{Clock, SystemClock}
 
 /**
  * Builder methods for KubernetesConf that allow easy control over what to 
return for a few
@@ -52,7 +53,8 @@ object KubernetesTestConf {
       secretEnvNamesToKeyRefs: Map[String, String] = Map.empty,
       secretNamesToMountPaths: Map[String, String] = Map.empty,
       volumes: Seq[KubernetesVolumeSpec] = Seq.empty,
-      proxyUser: Option[String] = None): KubernetesDriverConf = {
+      proxyUser: Option[String] = None,
+      clock: Clock = new SystemClock()): KubernetesDriverConf = {
     val conf = sparkConf.clone()
 
     resourceNamePrefix.foreach { prefix =>
@@ -67,7 +69,7 @@ object KubernetesTestConf {
     setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, 
secretEnvNamesToKeyRefs)
     setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes)
 
-    new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, 
proxyUser)
+    new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, 
proxyUser, clock)
   }
   // scalastyle:on argcount
 
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
index 06d322c9d19b..9d6a3de0f3bd 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala
@@ -109,16 +109,17 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
{
   }
 
   test("Long prefixes should switch to using a generated unique name.") {
+    val clock = new ManualClock()
     val sparkConf = new SparkConf(false)
       .set(KUBERNETES_NAMESPACE, "my-namespace")
-    val kconf = KubernetesTestConf.createDriverConf(
-      sparkConf = sparkConf,
-      resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
-      labels = DRIVER_LABELS)
-    val clock = new ManualClock()
 
     // Ensure that multiple services created at the same time generate unique 
names.
     val services = (1 to 10).map { _ =>
+      val kconf = KubernetesTestConf.createDriverConf(
+        sparkConf = sparkConf,
+        resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX),
+        labels = DRIVER_LABELS,
+        clock = clock)
       val configurationStep = new DriverServiceFeatureStep(kconf, clock = 
clock)
       val serviceName = configurationStep
         .getAdditionalKubernetesResources()
@@ -130,11 +131,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite 
{
       val hostAddress = configurationStep
         .getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key)
 
-      (serviceName -> hostAddress)
-    }.toMap
+      Tuple3(kconf, serviceName, hostAddress)
+    }
 
     assert(services.size === 10)
-    services.foreach { case (name, address) =>
+    services.foreach { case (kconf, name, address) =>
       assert(!name.startsWith(kconf.resourceNamePrefix))
       assert(!address.startsWith(kconf.resourceNamePrefix))
       assert(InternetDomainName.isValid(address))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to