[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153131921
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
+
+val driverCustomAnnotations = ConfigurationUtils
+  .parsePrefixedKeyValuePairs(
+submissionSparkConf,
+KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+  s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as 
it is reserved for" +
+" Spark bookkeeping operations.")
+
+val driverCustomEnvs = 
submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+  .map(env => new EnvVarBuilder()
+.withName(env._1)
+.withValue(env._2)
+.build())
+
+val allDriverAnnotations = driverCustomAnnotations ++ 
Map(SPARK_APP_NAME_ANNOTATION -> appName)
+
+val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
+  submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
+
+val driverCpuQuantity = new QuantityBuilder(false)
+  .withAmount(driverCpuCores)
+  .build()
+val driverMemoryQuantity = new QuantityBuilder(false)
+  .withAmount(s"${driverMemoryMiB}Mi")
+  .build()
+val driverMemoryLimitQuantit

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153123413
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import 
org.apache.spark.deploy.k8s.submit.steps.{BaseDriverConfigurationStep, 
DriverConfigurationStep, DriverKubernetesCredentialsStep, 
DriverServiceBootstrapStep}
+import org.apache.spark.util.SystemClock
+
+/**
+ * Constructs the complete list of driver configuration steps to run to 
deploy the Spark driver.
+ */
+private[spark] class DriverConfigurationStepsOrchestrator(
+namespace: String,
+kubernetesAppId: String,
+launchTime: Long,
+mainAppResource: MainAppResource,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) {
+
+  // The resource name prefix is derived from the application name, making 
it easy to connect the
+  // names of the Kubernetes resources from e.g. kubectl or the Kubernetes 
dashboard to the
+  // application the user submitted. However, we can't use the application 
name in the label, as
+  // label values are considerably restrictive, e.g. must be no longer 
than 63 characters in
+  // length. So we generate a separate identifier for the app ID itself, 
and bookkeeping that
+  // requires finding "all pods for this application" should use the 
kubernetesAppId.
+  private val kubernetesResourceNamePrefix =
+  s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
+  private val dockerImagePullPolicy = 
submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+
+  def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
+val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
+  submissionSparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
--- End diff --

nit: `SPARK_ROLE_LABEL` also ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153098283
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
--- End diff --

mainAppResource need not be valid here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153132562
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
--- End diff --

I will need to read up on how k8s does security before I can comment more 
here ... would be great if @vanzin and @tgravescs took a look.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153093334
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
+*/
+  def run(): Unit = {
+var currentDriverSpec = 
KubernetesDriverSpec.initialSpec(submissionSparkConf)
+// submissionSteps contain steps necessary to take, to resolve varying
+// client arguments that are passed in, created by orchestrator
+for (nextStep <- submissionSteps) {
+  currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
+}
+
+val resolvedDriverJavaOpts = currentDriverSpec
+  .driverSparkConf
+  // Remove this as the options are instead extracted and set 
individually below using
+  // environment variables with prefix SPARK_JAVA_OPT_.
+  .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+  .getAll
+  .map {
+case (confKey, confValue) => s"-D$confKey=$confValue"
+  } ++ 
driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+val driverJavaOptsEnvs: Seq[EnvVar] = 
resolvedDriverJavaOpts.zipWithInde

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153092633
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,18 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
+  childArgs ++= Array("--primary-java-resource", args.primaryResource)
--- End diff --

Handle primary resource == `SparkLauncher.NO_RESOURCE` ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153126963
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+  appId: String, maybeLoggingInterval: Option[Long])
+extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
+  scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
+}
+  }
+
+  override def eventReceived(action: Action, pod: Pod): Unit = {
+this.pod = Option(pod)
+action match {
+  case Action.DELETED =>
+closeWatch()
+
+  case Action.ERROR =>
+closeWatch()
--- End diff --

No need for `hasCompleted` ? Are all error's non-recoverable ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153093170
  
--- Diff: resource-managers/kubernetes/core/pom.xml ---
@@ -0,0 +1,102 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
--- End diff --

Hopefully the duplicated files/diffs between #19468 and #19717 are in sync.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153122738
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the" +
+" driver submission server. This is memory that accounts for 
things like VM overheads," +
+" interned strings, other native overheads, etc. This tends to 
grow with the driver's" +
+" memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dynam

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153219134
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+private[spark] class Client(
--- End diff --

nit: add comments to describe the usage of this class, and the meaning of 
each params.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153224675
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
+
+val driverCustomAnnotations = ConfigurationUtils
+  .parsePrefixedKeyValuePairs(
+submissionSparkConf,
+KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+  s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as 
it is reserved for" +
+" Spark bookkeeping operations.")
+
+val driverCustomEnvs = 
submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+  .map(env => new EnvVarBuilder()
--- End diff --

nit: `new EnvVarBuilder()` in the next line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153222750
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+  appId: String, maybeLoggingInterval: Option[Long])
--- End diff --

nit: each param for a single line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153216292
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SparkConf
+
+private[spark] object ConfigurationUtils {
+
+  /**
+   * Extract and parse Spark configuration properties with a given name 
prefix and
+   * return the result as a Map. Keys must not have more than one value.
+   *
+   * @param sparkConf Spark configuration
+   * @param prefix the given property name prefix
+   * @return a Map storing the configuration property keys and values
+   */
+  def parsePrefixedKeyValuePairs(
+  sparkConf: SparkConf,
+  prefix: String): Map[String, String] = {
+sparkConf.getAllWithPrefix(prefix).toMap
+  }
+
+  def requireSecondIfFirstIsDefined(
--- End diff --

This function is never used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153228381
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind" +
+  " address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be" +
+  " managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is" +
+s" too long (must be <= 63 characters). Falling back to use 
$shorterServiceName" +
--- End diff --

nit: `must be <= $MAX_SERVICE_NAME_LENGTH characters`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153211367
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
--- End diff --

nit: for all the concat strings, always place the space into the end of one 
line, instead of the beginning of the next line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153218673
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
--- End diff --

Have we already support `--primary-py-file` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153220343
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
+*/
+  def run(): Unit = {
+var currentDriverSpec = 
KubernetesDriverSpec.initialSpec(submissionSparkConf)
+// submissionSteps contain steps necessary to take, to resolve varying
+// client arguments that are passed in, created by orchestrator
+for (nextStep <- submissionSteps) {
+  currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
+}
+
+val resolvedDriverJavaOpts = currentDriverSpec
--- End diff --

Is this `resolvedDriverJavaOpts` or `resolvedDriverOpts`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153101406
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,18 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
+  childArgs ++= Array("--primary-java-resource", args.primaryResource)
+  childArgs ++= Array("--main-class", args.mainClass)
+  if (args.childArgs != null) {
+args.childArgs.foreach { arg =>
+  childArgs += "--arg"
--- End diff --

Why not `childArgs += ("--arg", arg)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153218210
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
--- End diff --

nit: add comments to describe the usage of this class, and the meaning of 
each params.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153212548
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the" +
+" driver submission server. This is memory that accounts for 
things like VM overheads," +
+" interned strings, other native overheads, etc. This tends to 
grow with the driver's" +
+" memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is d

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153216011
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the" +
+" driver submission server. This is memory that accounts for 
things like VM overheads," +
+" interned strings, other native overheads, etc. This tends to 
grow with the driver's" +
+" memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is d

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153227490
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  driverServiceAcc

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153249656
  
--- Diff: resource-managers/kubernetes/core/pom.xml ---
@@ -0,0 +1,102 @@
+
+
+http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
--- End diff --

We will rebase this PR once #19468 gets merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153252119
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the" +
+" driver submission server. This is memory that accounts for 
things like VM overheads," +
+" interned strings, other native overheads, etc. This tends to 
grow with the driver's" +
+" memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dy

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153252555
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
+*/
+  def run(): Unit = {
+var currentDriverSpec = 
KubernetesDriverSpec.initialSpec(submissionSparkConf)
+// submissionSteps contain steps necessary to take, to resolve varying
+// client arguments that are passed in, created by orchestrator
+for (nextStep <- submissionSteps) {
+  currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
+}
+
+val resolvedDriverJavaOpts = currentDriverSpec
+  .driverSparkConf
+  // Remove this as the options are instead extracted and set 
individually below using
+  // environment variables with prefix SPARK_JAVA_OPT_.
+  .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+  .getAll
+  .map {
+case (confKey, confValue) => s"-D$confKey=$confValue"
+  } ++ 
driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+val driverJavaOptsEnvs: Seq[EnvVar] = 
resolvedDriverJavaOpts.zipWithI

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153281380
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the" +
+" driver submission server. This is memory that accounts for 
things like VM overheads," +
+" interned strings, other native overheads, etc. This tends to 
grow with the driver's" +
+" memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dynam

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153281970
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
+*/
+  def run(): Unit = {
+var currentDriverSpec = 
KubernetesDriverSpec.initialSpec(submissionSparkConf)
+// submissionSteps contain steps necessary to take, to resolve varying
+// client arguments that are passed in, created by orchestrator
+for (nextStep <- submissionSteps) {
+  currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
+}
+
+val resolvedDriverJavaOpts = currentDriverSpec
+  .driverSparkConf
+  // Remove this as the options are instead extracted and set 
individually below using
+  // environment variables with prefix SPARK_JAVA_OPT_.
+  .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+  .getAll
+  .map {
+case (confKey, confValue) => s"-D$confKey=$confValue"
+  } ++ 
driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
+val driverJavaOptsEnvs: Seq[EnvVar] = 
resolvedDriverJavaOpts.zipWithInde

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366614
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
--- End diff --

Wondering why?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366627
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,18 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
+  childArgs ++= Array("--primary-java-resource", args.primaryResource)
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366646
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import 
org.apache.spark.deploy.k8s.submit.steps.{BaseDriverConfigurationStep, 
DriverConfigurationStep, DriverKubernetesCredentialsStep, 
DriverServiceBootstrapStep}
+import org.apache.spark.util.SystemClock
+
+/**
+ * Constructs the complete list of driver configuration steps to run to 
deploy the Spark driver.
+ */
+private[spark] class DriverConfigurationStepsOrchestrator(
+namespace: String,
+kubernetesAppId: String,
+launchTime: Long,
+mainAppResource: MainAppResource,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) {
+
+  // The resource name prefix is derived from the application name, making 
it easy to connect the
+  // names of the Kubernetes resources from e.g. kubectl or the Kubernetes 
dashboard to the
+  // application the user submitted. However, we can't use the application 
name in the label, as
+  // label values are considerably restrictive, e.g. must be no longer 
than 63 characters in
+  // length. So we generate a separate identifier for the app ID itself, 
and bookkeeping that
+  // requires finding "all pods for this application" should use the 
kubernetesAppId.
+  private val kubernetesResourceNamePrefix =
+  s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-")
+  private val dockerImagePullPolicy = 
submissionSparkConf.get(DOCKER_IMAGE_PULL_POLICY)
+
+  def getAllConfigurationSteps(): Seq[DriverConfigurationStep] = {
+val driverCustomLabels = ConfigurationUtils.parsePrefixedKeyValuePairs(
+  submissionSparkConf,
+  KUBERNETES_DRIVER_LABEL_PREFIX)
+require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with 
key " +
--- End diff --

Yes, done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366681
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,18 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
+  childArgs ++= Array("--primary-java-resource", args.primaryResource)
+  childArgs ++= Array("--main-class", args.mainClass)
+  if (args.childArgs != null) {
+args.childArgs.foreach { arg =>
+  childArgs += "--arg"
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366657
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+  appId: String, maybeLoggingInterval: Option[Long])
+extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
+  scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
+}
+  }
+
+  override def eventReceived(action: Action, pod: Pod): Unit = {
+this.pod = Option(pod)
+action match {
+  case Action.DELETED =>
+closeWatch()
+
+  case Action.ERROR =>
+closeWatch()
--- End diff --

`hasCompleted` is checked below. Refactored the cases.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366758
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+private[spark] class Client(
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366725
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366709
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the" +
+" driver submission server. This is memory that accounts for 
things like VM overheads," +
+" interned strings, other native overheads, etc. This tends to 
grow with the driver's" +
+" memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dy

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366699
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
+" spark-submit in cluster mode, this can also be passed to 
spark-submit via the" +
+" --kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag" +
+" format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses" +
+" this service account when requesting executor pods from the API 
server. If specific" +
+" credentials are given for the driver pod to use, the driver will 
favor" +
+" using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the" +
+" driver submission server. This is memory that accounts for 
things like VM overheads," +
+" interned strings, other native overheads, etc. This tends to 
grow with the driver's" +
+" memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dy

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366751
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
--- End diff --

No, not yet. Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366717
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/ConfigurationUtils.scala
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s
+
+import org.apache.spark.SparkConf
+
+private[spark] object ConfigurationUtils {
+
+  /**
+   * Extract and parse Spark configuration properties with a given name 
prefix and
+   * return the result as a Map. Keys must not have more than one value.
+   *
+   * @param sparkConf Spark configuration
+   * @param prefix the given property name prefix
+   * @return a Map storing the configuration property keys and values
+   */
+  def parsePrefixedKeyValuePairs(
+  sparkConf: SparkConf,
+  prefix: String): Map[String, String] = {
+sparkConf.getAllWithPrefix(prefix).toMap
+  }
+
+  def requireSecondIfFirstIsDefined(
--- End diff --

Removed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366693
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366772
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+  appId: String, maybeLoggingInterval: Option[Long])
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366783
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
+
+val driverCustomAnnotations = ConfigurationUtils
+  .parsePrefixedKeyValuePairs(
+submissionSparkConf,
+KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+  s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as 
it is reserved for" +
+" Spark bookkeeping operations.")
+
+val driverCustomEnvs = 
submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+  .map(env => new EnvVarBuilder()
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366765
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
+*/
+  def run(): Unit = {
+var currentDriverSpec = 
KubernetesDriverSpec.initialSpec(submissionSparkConf)
+// submissionSteps contain steps necessary to take, to resolve varying
+// client arguments that are passed in, created by orchestrator
+for (nextStep <- submissionSteps) {
+  currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
+}
+
+val resolvedDriverJavaOpts = currentDriverSpec
--- End diff --

`resolvedDriverJavaOpts`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366798
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind" +
+  " address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be" +
+  " managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is" +
+s" too long (must be <= 63 characters). Falling back to use 
$shorterServiceName" +
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153366790
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  driverServiceAcco

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153375735
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
--- End diff --

For case of spark examples, spark thrift server, etc - there is no main app 
resource - the bits are bundled as part of spark itself.
You can take a look at how yarn or mesos handles this case : I would assume 
something similar should suffice (look for usage of SparkLauncher.NO_RESOURCE)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread mridulm
Github user mridulm commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153375960
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+  appId: String, maybeLoggingInterval: Option[Long])
+extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
+  scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
+}
+  }
+
+  override def eventReceived(action: Action, pod: Pod): Unit = {
+this.pod = Option(pod)
+action match {
+  case Action.DELETED =>
+closeWatch()
+
+  case Action.ERROR =>
+closeWatch()
--- End diff --

It is checked only for case of `action != DELETED` and `action != ERROR` 
right ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153400379
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
--- End diff --

Maybe we should do it in #19468 as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153401030
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using" +
--- End diff --

Sure, will update #19468.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153407820
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
--- End diff --

Here the style should be unified after #19631 is merged.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408859
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

We should also add check for `validateKillArguments` and 
`validateStatusRequstArguments`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408574
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dyn

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153410482
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
--- End diff --

Did you add support of this configuration "spark.driver.userClassPathFirst" 
and "spark.driver.userClassPathFirst"? Sorry I cannot find it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153407637
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
--- End diff --

This configuration seems like a set of String options, we should check all 
the legal options like this SQL conf:

```scala
  val CATALOG_IMPLEMENTATION = 
buildStaticConf("spark.sql.catalogImplementation")
.internal()
.stringConf
.checkValues(Set("hive", "in-memory"))
.createWithDefault("in-memory")
```

Beside we should add `checkValue` for the `ConfigEntry` if possible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153405181
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverS

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153408117
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverS

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153414260
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
+
+RUN apk upgrade --no-cache && \
+apk add --no-cache bash tini && \
+mkdir -p /opt/spark && \
+mkdir -p /opt/spark/work-dir \
+touch /opt/spark/RELEASE && \
+rm /bin/sh && \
+ln -sv /bin/bash /bin/sh && \
+chgrp root /etc/passwd && chmod ug+rw /etc/passwd
+
+COPY jars /opt/spark/jars
+COPY bin /opt/spark/bin
+COPY sbin /opt/spark/sbin
+COPY conf /opt/spark/conf
+COPY dockerfiles/spark-base/entrypoint.sh /opt/
+
+ENV SPARK_HOME /opt/spark
+
+WORKDIR /opt/spark/work-dir
+
+ENTRYPOINT [ "/opt/entrypoint.sh" ]
--- End diff --

nit: ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153406057
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverS

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153403697
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
--- End diff --

nit: can we use `ArrayBuffer` explicitly?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153412770
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind" +
+  " address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be" +
+  " managed via a Kubernetes service.")
--- End diff --

nit: ditto about concat strings.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153414331
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
 ---
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# echo commands to the terminal output
+set -x
+
+# Check whether there is a passwd entry for the container UID
+myuid=$(id -u)
+mygid=$(id -g)
+uidentry=$(getent passwd $myuid)
+
+# If there is no passwd entry for the container UID, attempt to create one
+if [ -z "$uidentry" ] ; then
+if [ -w /etc/passwd ] ; then
+echo "$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false" 
>> /etc/passwd
+else
+echo "Container ENTRYPOINT failed to add passwd entry for 
anonymous UID"
+fi
+fi
+
+# Execute the container CMD under tini for better hygiene
+/sbin/tini -s -- "$@"
--- End diff --

nit: ditto.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153403573
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
--- End diff --

nit: `foreach` instead of `collect`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153402807
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dynami

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153414219
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS
--- End diff --

nit: could you add a line break at the end of the file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153538185
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDri

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153543569
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
--- End diff --

No, I don't think so. @mccheah to confirm.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548646
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548631
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dy

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548690
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDri

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548482
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by either --primary-py-file or 
--primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
--- End diff --

Got it, removed the check.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548579
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548599
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548666
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548556
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
--- End diff --

Noted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548730
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind" +
+  " address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be" +
+  " managed via a Kubernetes service.")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548743
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548705
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDri

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548760
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
+
+RUN apk upgrade --no-cache && \
+apk add --no-cache bash tini && \
+mkdir -p /opt/spark && \
+mkdir -p /opt/spark/work-dir \
+touch /opt/spark/RELEASE && \
+rm /bin/sh && \
+ln -sv /bin/bash /bin/sh && \
+chgrp root /etc/passwd && chmod ug+rw /etc/passwd
+
+COPY jars /opt/spark/jars
+COPY bin /opt/spark/bin
+COPY sbin /opt/spark/sbin
+COPY conf /opt/spark/conf
+COPY dockerfiles/spark-base/entrypoint.sh /opt/
+
+ENV SPARK_HOME /opt/spark
+
+WORKDIR /opt/spark/work-dir
+
+ENTRYPOINT [ "/opt/entrypoint.sh" ]
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153548770
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
 ---
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# echo commands to the terminal output
+set -x
+
+# Check whether there is a passwd entry for the container UID
+myuid=$(id -u)
+mygid=$(id -g)
+uidentry=$(getent passwd $myuid)
+
+# If there is no passwd entry for the container UID, attempt to create one
+if [ -z "$uidentry" ] ; then
+if [ -w /etc/passwd ] ; then
+echo "$myuid:x:$myuid:$mygid:anonymous uid:$SPARK_HOME:/bin/false" 
>> /etc/passwd
+else
+echo "Container ENTRYPOINT failed to add passwd entry for 
anonymous UID"
+fi
+fi
+
+# Execute the container CMD under tini for better hygiene
+/sbin/tini -s -- "$@"
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153552488
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dy

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153581863
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
--- End diff --

We don't. It can be supported in the future. However, we also expect many 
of the use cases that normally would be handled by these properties to be 
manageable by building custom Docker images and letting those define the 
artifacts that are on the classpath.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread mccheah
Github user mccheah commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153582956
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriver

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r15367
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.Buffer.empty[String]
+
+args.sliding(2, 2).toList.collect {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainAppResource.isDefined,
+  "Main app resource must be defined by --primary-java-resource.")
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverS

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-28 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r153678912
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is dyn

[GitHub] spark pull request #19717: [SPARK-18278] [Submission] Spark on Kubernetes - ...

2017-11-10 Thread liyinan926
GitHub user liyinan926 opened a pull request:

https://github.com/apache/spark/pull/19717

[SPARK-18278] [Submission] Spark on Kubernetes - basic submission client

## What changes were proposed in this pull request?

This PR contains implementation of the basic submission client for the 
cluster mode of Spark on Kubernetes. It's step 2 from the step-wise plan 
documented 
[here](https://github.com/apache-spark-on-k8s/spark/issues/441#issuecomment-330802935).
This addition is covered by the 
[SPIP](http://apache-spark-developers-list.1001551.n3.nabble.com/SPIP-Spark-on-Kubernetes-td22147.html)
 vote which passed on Aug 31.

This PR and #19468 together form a MVP of Spark on Kubernetes that allows 
users to run Spark applications that use resources locally within the driver 
and executor containers on Kubernetes 1.6 and up. Some changes on pom and 
build/test setup are copied over from #19468 to make this PR self contained and 
testable.

The submission client is mainly responsible for creating the Kubernetes pod 
that runs the Spark driver. It follows a step-based approach to construct the 
driver pod, as the code under the `submit.steps` package shows. The steps are 
orchestrated by `DriverConfigurationStepsOrchestrator`. `Client` creates the 
driver pod and waits for the application to complete if it's configured to do 
so, which is the case by default. 

This PR also contains Dockerfiles of the driver and executor images. They 
are included because some of the environment variables set in the code would 
not make sense without referring to the Dockerfiles.

## How was this patch tested?

* The patch contains unit tests which are passing.
* Manual testing: ./build/mvn -Pkubernetes clean package succeeded.
* It is a subset of the entire changelist hosted at 
http://github.com/apache-spark-on-k8s/spark which is in active use in several 
organizations.
* There is integration testing enabled in the fork currently hosted by 
PepperData which is being moved over to RiseLAB CI.
* Detailed documentation on trying out the patch in its entirety is in: 
https://apache-spark-on-k8s.github.io/userdocs/running-on-kubernetes.html

cc @rxin @felixcheung @mateiz (shepherd)
k8s-big-data SIG members & contributors: @mccheah @foxish @ash211 @ssuchter 
@varunkatta @kimoonkim @erikerlandson @tnachen @ifilonenko


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache-spark-on-k8s/spark spark-kubernetes-4

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19717.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19717


commit 37c7ad6e8a4d107b69e7ec7842ae74446de229a0
Author: Yinan Li 
Date:   2017-11-10T00:28:10Z

Spark on Kubernetes - basic submission client




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org