This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new c224a966726e [SPARK-54260][CORE][K8S] Unify K8s cluster checks to use
the single regex via `SparkMasterRegex.isK8s` methods
c224a966726e is described below
commit c224a966726e9811bae299bfefeecbf5e7f871c6
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Nov 9 18:28:56 2025 -0800
[SPARK-54260][CORE][K8S] Unify K8s cluster checks to use the single regex
via `SparkMasterRegex.isK8s` methods
### What changes were proposed in this pull request?
This PR aims to unify all K8s cluster checks to use the single regex via
`SparkMasterRegex.isK8s` methods. In addition, this PR adds a new Scalastyle
rule to prevent a regression which adds this code pattern manually again.
### Why are the changes needed?
Although Apache Spark has a clear definition of `K8s` cluster regex, there
are many places to use improper string match patterns. We had better unity them
to prevent human errors.
**Official RegEx Definition**
https://github.com/apache/spark/blob/cc57743a229a70e2060230e3c9da49cbe24ad257/core/src/main/scala/org/apache/spark/SparkContext.scala#L3461
**Manual String Match Logics**
- Case 1: Only checking `k8s` prefix without checking `://`.
```
masterURL.startsWith("k8s")
```
```
args.master.startsWith("k8s")
```
- Case 2: Checking `k8s://` via using a independent string which is
error-prone.
```
master.startsWith("k8s://")
```
- Case 2: Checking with `Option` values which we can simplify more.
```
master.isDefined && master.get.startsWith("k8s://")
```
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the CIs which ensures that all features are working correctly and pass
the Scala linter.
**BEFORE**
```
$ dev/scalastyle
Scalastyle checks failed at following occurrences:
[error]
/Users/dongjoon/APACHE/spark-merge/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala:168:11:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala:78:22:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:38:65:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/util/Utils.scala:2881:25:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/SparkContext.scala:457:15:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:173:56:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:260:22:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:1044:22:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:52:53:
Use SparkMasterRegex.isK8s instead.
[error]
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:182:78:
Use SparkMasterRegex.isK8s instead.
[error] Total time: 23 s, completed Nov 9, 2025, 12:03:39 PM
```
**AFTER**
```
$ dev/scalastyle
Scalastyle checks passed.
```
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #52957 from dongjoon-hyun/SPARK-54260.
Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 5a454f86cd5fd95c0ce30454a170ac5001813385)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
core/src/main/scala/org/apache/spark/SparkContext.scala | 11 ++++++++++-
core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 6 +++---
.../scala/org/apache/spark/resource/ResourceProfile.scala | 5 +++--
.../org/apache/spark/resource/ResourceProfileManager.scala | 4 ++--
core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +-
.../spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala | 4 ++--
.../org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala | 4 ++--
.../scheduler/cluster/k8s/KubernetesClusterManager.scala | 2 +-
scalastyle-config.xml | 5 +++++
9 files changed, 29 insertions(+), 14 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2393851c6635..6f8be49e3959 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -454,7 +454,7 @@ class SparkContext(config: SparkConf) extends Logging {
// Set Spark driver host and port system properties. This explicitly sets
the configuration
// instead of relying on the default value of the config constant.
- if (master.startsWith("k8s") &&
+ if (SparkMasterRegex.isK8s(master) &&
_conf.getBoolean("spark.kubernetes.executor.useDriverPodIP", false)) {
logInfo("Use DRIVER_BIND_ADDRESS instead of DRIVER_HOST_ADDRESS as
driver address " +
"because spark.kubernetes.executor.useDriverPodIP is true in K8s
mode.")
@@ -3459,6 +3459,15 @@ private object SparkMasterRegex {
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connecting to kubernetes clusters
val KUBERNETES_REGEX = """k8s://(.*)""".r
+
+ def isK8s(master: String) : Boolean = isK8s(Option(master))
+
+ def isK8s(master: Option[String]) : Boolean = {
+ master match {
+ case Some(KUBERNETES_REGEX(_)) => true
+ case _ => false
+ }
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b5d026e39a90..c3215b16f25e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -170,7 +170,7 @@ private[spark] class SparkSubmit extends Logging {
// Here we are checking for client mode because when job is sumbitted
in cluster
// deploy mode with k8s resource manager, the spark submit in the
driver container
// is done in client mode.
- val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
+ val isKubernetesClusterModeDriver =
SparkMasterRegex.isK8s(args.master) &&
"client".equals(args.deployMode) &&
sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
if (isKubernetesClusterModeDriver) {
@@ -257,7 +257,7 @@ private[spark] class SparkSubmit extends Logging {
v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
- case m if m.startsWith("k8s") => KUBERNETES
+ case m if SparkMasterRegex.isK8s(m) => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ =>
error("Master must either be yarn or start with spark, k8s, or
local")
@@ -1041,7 +1041,7 @@ private[spark] class SparkSubmit extends Logging {
}
throw cause
} finally {
- if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
+ if (SparkMasterRegex.isK8s(args.master) &&
!isShell(args.primaryResource) &&
!isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) &&
!isConnectServer(args.mainClass)) {
try {
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 1b4b4f61016a..971b14265979 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException}
+import org.apache.spark.SparkMasterRegex._
import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKeys._
@@ -178,8 +179,8 @@ class ResourceProfile(
// only applies to yarn/k8s
private def shouldCheckExecutorCores(sparkConf: SparkConf): Boolean = {
val master = sparkConf.getOption("spark.master")
- sparkConf.contains(EXECUTOR_CORES) ||
- (master.isDefined && (master.get.equalsIgnoreCase("yarn") ||
master.get.startsWith("k8s")))
+ sparkConf.contains(EXECUTOR_CORES) || isK8s(master) ||
+ (master.isDefined && master.get.equalsIgnoreCase("yarn"))
}
/**
diff --git
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 10121f6ef266..a3d76a92ddd8 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import scala.collection.mutable.HashMap
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.{SparkConf, SparkException, SparkMasterRegex}
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.{config, Logging}
import org.apache.spark.internal.LogKeys
@@ -49,7 +49,7 @@ private[spark] class ResourceProfileManager(sparkConf:
SparkConf,
private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
private val master = sparkConf.getOption("spark.master")
private val isYarn = master.isDefined && master.get.equals("yarn")
- private val isK8s = master.isDefined && master.get.startsWith("k8s://")
+ private val isK8s = SparkMasterRegex.isK8s(master)
private val isStandaloneOrLocalCluster = master.isDefined && (
master.get.startsWith("spark://") ||
master.get.startsWith("local-cluster")
)
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 81e86c82211c..fc735ae3b99e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2878,7 +2878,7 @@ private[spark] object Utils
* in canCreate to determine if the KubernetesClusterManager should be used.
*/
def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
- require(rawMasterURL.startsWith("k8s://"),
+ require(SparkMasterRegex.isK8s(rawMasterURL),
"Kubernetes master URL must start with k8s://.")
val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
index 9a1e79594e48..24f42919ce05 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
@@ -20,7 +20,7 @@ import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
import io.fabric8.kubernetes.client.KubernetesClient
import org.apache.hadoop.util.StringUtils
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkMasterRegex}
import org.apache.spark.deploy.SparkDiagnosticsSetter
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION
@@ -75,6 +75,6 @@ private[spark] class
SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes
}
override def supports(clusterManagerUrl: String): Boolean = {
- clusterManagerUrl.startsWith("k8s://")
+ SparkMasterRegex.isK8s(clusterManagerUrl)
}
}
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
index 17704b908558..bd8e0f97132d 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
@@ -23,7 +23,7 @@ import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.client.KubernetesClient
import io.fabric8.kubernetes.client.dsl.PodResource
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkMasterRegex}
import org.apache.spark.deploy.SparkSubmitOperation
import org.apache.spark.deploy.k8s.{KubernetesUtils,
SparkKubernetesClientFactory}
import
org.apache.spark.deploy.k8s.Config.{KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX,
KUBERNETES_SUBMIT_GRACE_PERIOD}
@@ -165,7 +165,7 @@ private[spark] class K8SSparkSubmitOperation extends
SparkSubmitOperation
}
override def supports(master: String): Boolean = {
- master.startsWith("k8s://")
+ SparkMasterRegex.isK8s(master)
}
}
diff --git
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 49eac64745b7..3fb1ed0c9c0f 100644
---
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock,
ThreadUtils, Utils}
private[spark] class KubernetesClusterManager extends ExternalClusterManager
with Logging {
import SparkMasterRegex._
- override def canCreate(masterURL: String): Boolean =
masterURL.startsWith("k8s")
+ override def canCreate(masterURL: String): Boolean =
SparkMasterRegex.isK8s(masterURL)
private def isLocal(conf: SparkConf): Boolean =
conf.get(KUBERNETES_DRIVER_MASTER_URL).startsWith("local")
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index cea4cd3ac4fc..9bcdecbbf4fd 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -846,4 +846,9 @@ This file is divided into 3 sections:
<parameters><parameter
name="regex">\bInts\.checkedCast\b</parameter></parameters>
<customMessage>Use JavaUtils.checkedCast instead.</customMessage>
</check>
+
+ <check customId="startsWithK8s" level="error"
class="org.scalastyle.file.RegexChecker" enabled="true">
+ <parameters><parameter
name="regex">\bstartsWith\("k8s\b</parameter></parameters>
+ <customMessage>Use SparkMasterRegex.isK8s instead.</customMessage>
+ </check>
</scalastyle>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]