This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ccb0eb699f7c [SPARK-48038][K8S] Promote driverServiceName to KubernetesDriverConf ccb0eb699f7c is described below commit ccb0eb699f7c54aa3902d1ebbb34684693b563de Author: Cheng Pan <cheng...@apache.org> AuthorDate: Mon Apr 29 08:35:13 2024 -0700 [SPARK-48038][K8S] Promote driverServiceName to KubernetesDriverConf ### What changes were proposed in this pull request? Promote `driverServiceName` from `DriverServiceFeatureStep` to `KubernetesDriverConf`. ### Why are the changes needed? To allow other feature steps, e.g. ingress(proposed in SPARK-47954), to access `driverServiceName`. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT has been updated. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46276 from pan3793/SPARK-48038. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/deploy/k8s/KubernetesConf.scala | 22 +++++++++++++++++++--- .../k8s/features/DriverServiceFeatureStep.scala | 14 ++------------ .../spark/deploy/k8s/KubernetesTestConf.scala | 6 ++++-- .../features/DriverServiceFeatureStepSuite.scala | 17 +++++++++-------- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index b55f9317d10b..fda772b737fe 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -24,12 +24,13 @@ import org.apache.commons.lang3.StringUtils import org.apache.spark.{SPARK_VERSION, SparkConf} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.features.DriverServiceFeatureStep._ import org.apache.spark.deploy.k8s.submit._ import org.apache.spark.internal.{Logging, MDC} import org.apache.spark.internal.LogKeys.{CONFIG, EXECUTOR_ENV_REGEX} import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * Structure containing metadata for Kubernetes logic to build Spark pods. @@ -83,12 +84,27 @@ private[spark] class KubernetesDriverConf( val mainAppResource: MainAppResource, val mainClass: String, val appArgs: Array[String], - val proxyUser: Option[String]) - extends KubernetesConf(sparkConf) { + val proxyUser: Option[String], + clock: Clock = new SystemClock()) + extends KubernetesConf(sparkConf) with Logging { def driverNodeSelector: Map[String, String] = KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_NODE_SELECTOR_PREFIX) + lazy val driverServiceName: String = { + val preferredServiceName = s"$resourceNamePrefix$DRIVER_SVC_POSTFIX" + if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName + } else { + val randomServiceId = KubernetesUtils.uniqueID(clock) + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + + s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + + s"$shorterServiceName as the driver service's name.") + shorterServiceName + } + } + override val resourceNamePrefix: String = { val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName)) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala index cba4f442371c..9adfb2b8de49 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStep.scala @@ -20,7 +20,7 @@ import scala.jdk.CollectionConverters._ import io.fabric8.kubernetes.api.model.{HasMetadata, ServiceBuilder} -import org.apache.spark.deploy.k8s.{KubernetesDriverConf, KubernetesUtils, SparkPod} +import org.apache.spark.deploy.k8s.{KubernetesDriverConf, SparkPod} import org.apache.spark.deploy.k8s.Config.{KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH, KUBERNETES_DRIVER_SERVICE_IP_FAMILIES, KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.{config, Logging} @@ -39,17 +39,7 @@ private[spark] class DriverServiceFeatureStep( s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + "managed via a Kubernetes service.") - private val preferredServiceName = s"${kubernetesConf.resourceNamePrefix}$DRIVER_SVC_POSTFIX" - private val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { - preferredServiceName - } else { - val randomServiceId = KubernetesUtils.uniqueID(clock = clock) - val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" - logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + - s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + - s"$shorterServiceName as the driver service's name.") - shorterServiceName - } + private val resolvedServiceName = kubernetesConf.driverServiceName private val ipFamilyPolicy = kubernetesConf.sparkConf.get(KUBERNETES_DRIVER_SERVICE_IP_FAMILY_POLICY) private val ipFamilies = diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala index d6a60b1edea2..b70b9348d23b 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/KubernetesTestConf.scala @@ -22,6 +22,7 @@ import io.fabric8.kubernetes.api.model.Pod import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, MainAppResource} +import org.apache.spark.util.{Clock, SystemClock} /** * Builder methods for KubernetesConf that allow easy control over what to return for a few @@ -52,7 +53,8 @@ object KubernetesTestConf { secretEnvNamesToKeyRefs: Map[String, String] = Map.empty, secretNamesToMountPaths: Map[String, String] = Map.empty, volumes: Seq[KubernetesVolumeSpec] = Seq.empty, - proxyUser: Option[String] = None): KubernetesDriverConf = { + proxyUser: Option[String] = None, + clock: Clock = new SystemClock()): KubernetesDriverConf = { val conf = sparkConf.clone() resourceNamePrefix.foreach { prefix => @@ -67,7 +69,7 @@ object KubernetesTestConf { setPrefixedConfigs(conf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX, secretEnvNamesToKeyRefs) setVolumeSpecs(conf, KUBERNETES_DRIVER_VOLUMES_PREFIX, volumes) - new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, proxyUser) + new KubernetesDriverConf(conf, appId, mainAppResource, mainClass, appArgs, proxyUser, clock) } // scalastyle:on argcount diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index 06d322c9d19b..9d6a3de0f3bd 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -109,16 +109,17 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { } test("Long prefixes should switch to using a generated unique name.") { + val clock = new ManualClock() val sparkConf = new SparkConf(false) .set(KUBERNETES_NAMESPACE, "my-namespace") - val kconf = KubernetesTestConf.createDriverConf( - sparkConf = sparkConf, - resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), - labels = DRIVER_LABELS) - val clock = new ManualClock() // Ensure that multiple services created at the same time generate unique names. val services = (1 to 10).map { _ => + val kconf = KubernetesTestConf.createDriverConf( + sparkConf = sparkConf, + resourceNamePrefix = Some(LONG_RESOURCE_NAME_PREFIX), + labels = DRIVER_LABELS, + clock = clock) val configurationStep = new DriverServiceFeatureStep(kconf, clock = clock) val serviceName = configurationStep .getAdditionalKubernetesResources() @@ -130,11 +131,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite { val hostAddress = configurationStep .getAdditionalPodSystemProperties()(DRIVER_HOST_ADDRESS.key) - (serviceName -> hostAddress) - }.toMap + Tuple3(kconf, serviceName, hostAddress) + } assert(services.size === 10) - services.foreach { case (name, address) => + services.foreach { case (kconf, name, address) => assert(!name.startsWith(kconf.resourceNamePrefix)) assert(!address.startsWith(kconf.resourceNamePrefix)) assert(InternetDomainName.isValid(address)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org