Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/20451#discussion_r201512105 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala --- @@ -20,41 +20,65 @@ import java.io.File import java.util.concurrent.TimeUnit import com.google.common.cache.CacheBuilder -import io.fabric8.kubernetes.client.Config - -import org.apache.spark.{SparkContext, SparkException} -import org.apache.spark.deploy.k8s.{KubernetesUtils, SparkKubernetesClientFactory} +import io.fabric8.kubernetes.client.{Config, KubernetesClient} +import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.internal.Logging import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} import org.apache.spark.util.{SystemClock, ThreadUtils} -private[spark] class KubernetesClusterManager extends ExternalClusterManager with Logging { +trait ManagerSpecificHandlers { + def createKubernetesClient(sparkConf: SparkConf): KubernetesClient +} - override def canCreate(masterURL: String): Boolean = masterURL.startsWith("k8s") +private[spark] class KubernetesClusterManager extends ExternalClusterManager + with ManagerSpecificHandlers with Logging { - override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { - if (masterURL.startsWith("k8s") && - sc.deployMode == "client" && - !sc.conf.get(KUBERNETES_DRIVER_SUBMIT_CHECK).getOrElse(false)) { - throw new SparkException("Client mode is currently not supported for Kubernetes.") + class InClusterHandlers extends ManagerSpecificHandlers { + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + SparkKubernetesClientFactory.createKubernetesClient( + KUBERNETES_MASTER_INTERNAL_URL, + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + sparkConf, + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_TOKEN_PATH)), + Some(new File(Config.KUBERNETES_SERVICE_ACCOUNT_CA_CRT_PATH))) + } + + class OutClusterHandlers extends ManagerSpecificHandlers { + override def createKubernetesClient(sparkConf: SparkConf): KubernetesClient = + SparkKubernetesClientFactory.createKubernetesClient( + sparkConf.get("spark.master").replace("k8s://", ""), + Some(sparkConf.get(KUBERNETES_NAMESPACE)), + KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, --- End diff -- The name of this prefix `spark.kubernetes.authenticate.driver.mounted` sounds weird in this case given that the client is running outside the cluster. BTW: can we alternatively use the config at `$HOME//.kube/config` to build a kubernetes client instead? I think this is a common approach for building clients outside a cluster.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org