Repository: spark
Updated Branches:
  refs/heads/master 6d9994039 -> 2fe16333d


[SPARK-22778][KUBERNETES] Added the missing service metadata for 
KubernetesClusterManager

## What changes were proposed in this pull request?

This PR added the missing service metadata for `KubernetesClusterManager`. 
Without the metadata, the service loader couldn't load 
`KubernetesClusterManager`, and caused the driver to fail to create a 
`ExternalClusterManager`, as being reported in SPARK-22778. The PR also changed 
the `k8s:` prefix used to `k8s://`, which is what existing Spark on k8s users 
are familiar and used to.

## How was this patch tested?

Manual testing verified that the fix resolved the issue in SPARK-22778.

/cc vanzin felixcheung jiangxb1987

Author: Yinan Li <liyinan...@gmail.com>

Closes #19972 from liyinan926/fix-22778.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fe16333
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fe16333
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fe16333

Branch: refs/heads/master
Commit: 2fe16333d59cd8558afca3916821e1ea7e98d1bc
Parents: 6d99940
Author: Yinan Li <liyinan...@gmail.com>
Authored: Thu Dec 14 14:03:08 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Dec 14 14:03:08 2017 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/util/Utils.scala        | 6 +++---
 .../scala/org/apache/spark/deploy/SparkSubmitSuite.scala     | 2 +-
 core/src/test/scala/org/apache/spark/util/UtilsSuite.scala   | 8 ++++----
 .../org.apache.spark.scheduler.ExternalClusterManager        | 1 +
 .../deploy/k8s/submit/KubernetesClientApplication.scala      | 4 ++--
 5 files changed, 11 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2fe16333/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fe5b4ea..8871870 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2757,7 +2757,7 @@ private[spark] object Utils extends Logging {
 
   /**
    * Check the validity of the given Kubernetes master URL and return the 
resolved URL. Prefix
-   * "k8s:" is appended to the resolved URL as the prefix is used by 
KubernetesClusterManager
+   * "k8s://" is appended to the resolved URL as the prefix is used by 
KubernetesClusterManager
    * in canCreate to determine if the KubernetesClusterManager should be used.
    */
   def checkAndGetK8sMasterUrl(rawMasterURL: String): String = {
@@ -2770,7 +2770,7 @@ private[spark] object Utils extends Logging {
       val resolvedURL = s"https://$masterWithoutK8sPrefix";
       logInfo("No scheme specified for kubernetes master URL, so defaulting to 
https. Resolved " +
         s"URL is $resolvedURL.")
-      return s"k8s:$resolvedURL"
+      return s"k8s://$resolvedURL"
     }
 
     val masterScheme = new URI(masterWithoutK8sPrefix).getScheme
@@ -2789,7 +2789,7 @@ private[spark] object Utils extends Logging {
         throw new IllegalArgumentException("Invalid Kubernetes master scheme: 
" + masterScheme)
     }
 
-    return s"k8s:$resolvedURL"
+    s"k8s://$resolvedURL"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe16333/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 35594ec..2eb8a1f 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -408,7 +408,7 @@ class SparkSubmitSuite
     childArgsMap.get("--arg") should be (Some("arg1"))
     mainClass should be (KUBERNETES_CLUSTER_SUBMIT_CLASS)
     classpath should have length (0)
-    conf.get("spark.master") should be ("k8s:https://host:port";)
+    conf.get("spark.master") should be ("k8s://https://host:port";)
     conf.get("spark.executor.memory") should be ("5g")
     conf.get("spark.driver.memory") should be ("4g")
     conf.get("spark.kubernetes.namespace") should be ("spark")

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe16333/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 5c4e4ca..eaea6b0 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1148,16 +1148,16 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
 
   test("check Kubernetes master URL") {
     val k8sMasterURLHttps = 
Utils.checkAndGetK8sMasterUrl("k8s://https://host:port";)
-    assert(k8sMasterURLHttps === "k8s:https://host:port";)
+    assert(k8sMasterURLHttps === "k8s://https://host:port";)
 
     val k8sMasterURLHttp = 
Utils.checkAndGetK8sMasterUrl("k8s://http://host:port";)
-    assert(k8sMasterURLHttp === "k8s:http://host:port";)
+    assert(k8sMasterURLHttp === "k8s://http://host:port";)
 
     val k8sMasterURLWithoutScheme = 
Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1:8443")
-    assert(k8sMasterURLWithoutScheme === "k8s:https://127.0.0.1:8443";)
+    assert(k8sMasterURLWithoutScheme === "k8s://https://127.0.0.1:8443";)
 
     val k8sMasterURLWithoutScheme2 = 
Utils.checkAndGetK8sMasterUrl("k8s://127.0.0.1")
-    assert(k8sMasterURLWithoutScheme2 === "k8s:https://127.0.0.1";)
+    assert(k8sMasterURLWithoutScheme2 === "k8s://https://127.0.0.1";)
 
     intercept[IllegalArgumentException] {
       Utils.checkAndGetK8sMasterUrl("k8s:https://host:port";)

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe16333/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
 
b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000..81d1476
--- /dev/null
+++ 
b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager

http://git-wip-us.apache.org/repos/asf/spark/blob/2fe16333/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 4d17608..240a114 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -203,8 +203,8 @@ private[spark] class KubernetesClientApplication extends 
SparkApplication {
     val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION)
     val appName = sparkConf.getOption("spark.app.name").getOrElse("spark")
     // The master URL has been checked for validity already in SparkSubmit.
-    // We just need to get rid of the "k8s:" prefix here.
-    val master = sparkConf.get("spark.master").substring("k8s:".length)
+    // We just need to get rid of the "k8s://" prefix here.
+    val master = sparkConf.get("spark.master").substring("k8s://".length)
     val loggingInterval = if (waitForAppCompletion) 
Some(sparkConf.get(REPORT_INTERVAL)) else None
 
     val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to