This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new c224a966726e [SPARK-54260][CORE][K8S] Unify K8s cluster checks to use 
the single regex via `SparkMasterRegex.isK8s` methods
c224a966726e is described below

commit c224a966726e9811bae299bfefeecbf5e7f871c6
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Sun Nov 9 18:28:56 2025 -0800

    [SPARK-54260][CORE][K8S] Unify K8s cluster checks to use the single regex 
via `SparkMasterRegex.isK8s` methods
    
    ### What changes were proposed in this pull request?
    
    This PR aims to unify all K8s cluster checks to use the single regex via 
`SparkMasterRegex.isK8s` methods. In addition, this PR adds a new Scalastyle 
rule to prevent a regression which adds this code pattern manually again.
    
    ### Why are the changes needed?
    
    Although Apache Spark has a clear definition of `K8s` cluster regex, there 
are many places to use improper string match patterns. We had better unity them 
to prevent human errors.
    
    **Official RegEx Definition**
    
https://github.com/apache/spark/blob/cc57743a229a70e2060230e3c9da49cbe24ad257/core/src/main/scala/org/apache/spark/SparkContext.scala#L3461
    
    **Manual String Match Logics**
    
    - Case 1: Only checking `k8s` prefix without checking `://`.
    ```
    masterURL.startsWith("k8s")
    ```
    ```
    args.master.startsWith("k8s")
    ```
    
    - Case 2: Checking `k8s://` via using a independent string which is 
error-prone.
    ```
    master.startsWith("k8s://")
    ```
    
    - Case 2: Checking with `Option` values which we can simplify more.
    ```
    master.isDefined && master.get.startsWith("k8s://")
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the CIs which ensures that all features are working correctly and pass 
the Scala linter.
    
    **BEFORE**
    ```
    $ dev/scalastyle
    Scalastyle checks failed at following occurrences:
    [error] 
/Users/dongjoon/APACHE/spark-merge/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala:168:11:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala:78:22:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala:38:65:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/util/Utils.scala:2881:25:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/SparkContext.scala:457:15:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:173:56:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:260:22:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala:1044:22:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala:52:53:
 Use SparkMasterRegex.isK8s instead.
    [error] 
/Users/dongjoon/APACHE/spark-merge/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala:182:78:
 Use SparkMasterRegex.isK8s instead.
    [error] Total time: 23 s, completed Nov 9, 2025, 12:03:39 PM
    ```
    
    **AFTER**
    ```
    $ dev/scalastyle
    Scalastyle checks passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #52957 from dongjoon-hyun/SPARK-54260.
    
    Authored-by: Dongjoon Hyun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 5a454f86cd5fd95c0ce30454a170ac5001813385)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 core/src/main/scala/org/apache/spark/SparkContext.scala       | 11 ++++++++++-
 core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala |  6 +++---
 .../scala/org/apache/spark/resource/ResourceProfile.scala     |  5 +++--
 .../org/apache/spark/resource/ResourceProfileManager.scala    |  4 ++--
 core/src/main/scala/org/apache/spark/util/Utils.scala         |  2 +-
 .../spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala   |  4 ++--
 .../org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala     |  4 ++--
 .../scheduler/cluster/k8s/KubernetesClusterManager.scala      |  2 +-
 scalastyle-config.xml                                         |  5 +++++
 9 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2393851c6635..6f8be49e3959 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -454,7 +454,7 @@ class SparkContext(config: SparkConf) extends Logging {
 
     // Set Spark driver host and port system properties. This explicitly sets 
the configuration
     // instead of relying on the default value of the config constant.
-    if (master.startsWith("k8s") &&
+    if (SparkMasterRegex.isK8s(master) &&
         _conf.getBoolean("spark.kubernetes.executor.useDriverPodIP", false)) {
       logInfo("Use DRIVER_BIND_ADDRESS instead of DRIVER_HOST_ADDRESS as 
driver address " +
         "because spark.kubernetes.executor.useDriverPodIP is true in K8s 
mode.")
@@ -3459,6 +3459,15 @@ private object SparkMasterRegex {
   val SPARK_REGEX = """spark://(.*)""".r
   // Regular expression for connecting to kubernetes clusters
   val KUBERNETES_REGEX = """k8s://(.*)""".r
+
+  def isK8s(master: String) : Boolean = isK8s(Option(master))
+
+  def isK8s(master: Option[String]) : Boolean = {
+    master match {
+      case Some(KUBERNETES_REGEX(_)) => true
+      case _ => false
+    }
+  }
 }
 
 /**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala 
b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index b5d026e39a90..c3215b16f25e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -170,7 +170,7 @@ private[spark] class SparkSubmit extends Logging {
         // Here we are checking for client mode because when job is sumbitted 
in cluster
         // deploy mode with k8s resource manager, the spark submit in the 
driver container
         // is done in client mode.
-        val isKubernetesClusterModeDriver = args.master.startsWith("k8s") &&
+        val isKubernetesClusterModeDriver = 
SparkMasterRegex.isK8s(args.master) &&
           "client".equals(args.deployMode) &&
           sparkConf.getBoolean("spark.kubernetes.submitInDriver", false)
         if (isKubernetesClusterModeDriver) {
@@ -257,7 +257,7 @@ private[spark] class SparkSubmit extends Logging {
         v match {
           case "yarn" => YARN
           case m if m.startsWith("spark") => STANDALONE
-          case m if m.startsWith("k8s") => KUBERNETES
+          case m if SparkMasterRegex.isK8s(m) => KUBERNETES
           case m if m.startsWith("local") => LOCAL
           case _ =>
             error("Master must either be yarn or start with spark, k8s, or 
local")
@@ -1041,7 +1041,7 @@ private[spark] class SparkSubmit extends Logging {
         }
         throw cause
     } finally {
-      if (args.master.startsWith("k8s") && !isShell(args.primaryResource) &&
+      if (SparkMasterRegex.isK8s(args.master) && 
!isShell(args.primaryResource) &&
           !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) &&
           !isConnectServer(args.mainClass)) {
         try {
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index 1b4b4f61016a..971b14265979 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -25,6 +25,7 @@ import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException}
+import org.apache.spark.SparkMasterRegex._
 import org.apache.spark.annotation.{Evolving, Since}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.LogKeys._
@@ -178,8 +179,8 @@ class ResourceProfile(
   // only applies to yarn/k8s
   private def shouldCheckExecutorCores(sparkConf: SparkConf): Boolean = {
     val master = sparkConf.getOption("spark.master")
-    sparkConf.contains(EXECUTOR_CORES) ||
-      (master.isDefined && (master.get.equalsIgnoreCase("yarn") || 
master.get.startsWith("k8s")))
+    sparkConf.contains(EXECUTOR_CORES) || isK8s(master) ||
+      (master.isDefined && master.get.equalsIgnoreCase("yarn"))
   }
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index 10121f6ef266..a3d76a92ddd8 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -21,7 +21,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
 
 import scala.collection.mutable.HashMap
 
-import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.{SparkConf, SparkException, SparkMasterRegex}
 import org.apache.spark.annotation.Evolving
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.internal.LogKeys
@@ -49,7 +49,7 @@ private[spark] class ResourceProfileManager(sparkConf: 
SparkConf,
   private val dynamicEnabled = Utils.isDynamicAllocationEnabled(sparkConf)
   private val master = sparkConf.getOption("spark.master")
   private val isYarn = master.isDefined && master.get.equals("yarn")
-  private val isK8s = master.isDefined && master.get.startsWith("k8s://")
+  private val isK8s = SparkMasterRegex.isK8s(master)
   private val isStandaloneOrLocalCluster = master.isDefined && (
       master.get.startsWith("spark://") || 
master.get.startsWith("local-cluster")
     )
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 81e86c82211c..fc735ae3b99e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2878,7 +2878,7 @@ private[spark] object Utils
    * in canCreate to determine if the KubernetesClusterManager should be used.
    */
   def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
-    require(rawMasterURL.startsWith("k8s://"),
+    require(SparkMasterRegex.isK8s(rawMasterURL),
       "Kubernetes master URL must start with k8s://.")
     val masterWithoutK8sPrefix = rawMasterURL.substring("k8s://".length)
 
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
index 9a1e79594e48..24f42919ce05 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala
@@ -20,7 +20,7 @@ import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
 import io.fabric8.kubernetes.client.KubernetesClient
 import org.apache.hadoop.util.StringUtils
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkMasterRegex}
 import org.apache.spark.deploy.SparkDiagnosticsSetter
 import org.apache.spark.deploy.k8s.Config._
 import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION
@@ -75,6 +75,6 @@ private[spark] class 
SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes
   }
 
   override def supports(clusterManagerUrl: String): Boolean = {
-    clusterManagerUrl.startsWith("k8s://")
+    SparkMasterRegex.isK8s(clusterManagerUrl)
   }
 }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
index 17704b908558..bd8e0f97132d 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
@@ -23,7 +23,7 @@ import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
 
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkMasterRegex}
 import org.apache.spark.deploy.SparkSubmitOperation
 import org.apache.spark.deploy.k8s.{KubernetesUtils, 
SparkKubernetesClientFactory}
 import 
org.apache.spark.deploy.k8s.Config.{KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX, 
KUBERNETES_SUBMIT_GRACE_PERIOD}
@@ -165,7 +165,7 @@ private[spark] class K8SSparkSubmitOperation extends 
SparkSubmitOperation
   }
 
   override def supports(master: String): Boolean = {
-    master.startsWith("k8s://")
+    SparkMasterRegex.isK8s(master)
   }
 }
 
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
index 49eac64745b7..3fb1ed0c9c0f 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala
@@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, 
ThreadUtils, Utils}
 private[spark] class KubernetesClusterManager extends ExternalClusterManager 
with Logging {
   import SparkMasterRegex._
 
-  override def canCreate(masterURL: String): Boolean = 
masterURL.startsWith("k8s")
+  override def canCreate(masterURL: String): Boolean = 
SparkMasterRegex.isK8s(masterURL)
 
   private def isLocal(conf: SparkConf): Boolean =
     conf.get(KUBERNETES_DRIVER_MASTER_URL).startsWith("local")
diff --git a/scalastyle-config.xml b/scalastyle-config.xml
index cea4cd3ac4fc..9bcdecbbf4fd 100644
--- a/scalastyle-config.xml
+++ b/scalastyle-config.xml
@@ -846,4 +846,9 @@ This file is divided into 3 sections:
     <parameters><parameter 
name="regex">\bInts\.checkedCast\b</parameter></parameters>
     <customMessage>Use JavaUtils.checkedCast instead.</customMessage>
   </check>
+
+  <check customId="startsWithK8s" level="error" 
class="org.scalastyle.file.RegexChecker" enabled="true">
+    <parameters><parameter 
name="regex">\bstartsWith\("k8s\b</parameter></parameters>
+    <customMessage>Use SparkMasterRegex.isK8s instead.</customMessage>
+  </check>
 </scalastyle>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to