[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154801257
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
--- End diff --

/cc @mccheah.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154800789
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154800800
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154800815
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+
+import org.apache.spark.util.Utils
+
+private[spark] object KubernetesFileUtils {
+
+  /**
+   * For the given collection of file URIs, resolves them as follows:
+   * - File URIs with scheme file:// are resolved to the given download 
path.
+   * - File URIs with scheme local:// resolve to just the path of the URI.
+   * - Otherwise, the URIs are returned as-is.
+   */
+  def resolveSubmittedUris(
+  fileUris: Iterable[String],
+  fileDownloadPath: String): Iterable[String] = {
+fileUris.map { uri =>
+  val fileUri = Utils.resolveURI(uri)
+  val fileScheme = Option(fileUri.getScheme).getOrElse("file")
+  fileScheme match {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154800754
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
--- End diff --

Merged both into `spark.driver.memoryOverhead` and used it in both yarn and 
k8s.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154800774
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
+
+val driverCustomAnnotations = ConfigurationUtils
+  .parsePrefixedKeyValuePairs(
+submissionSparkConf,
+KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+  s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as 
it is reserved for" +
+" Spark bookkeeping operations.")
+
+val driverCustomEnvs = 
submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+  .map(env =>
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154800102
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
--- End diff --

Agreed. But it seems `spark.driver.cores` is used in a lot of places. I 
think it needs a separate PR to union all of them. It also worths pointing out 
that the value of `spark.driver.cores` is used to set CPU request, and in 
Kubernetes this can be fractional, e.g., `0.1` or `100m`. `DRIVER_CORES` in 
yarn, however, only accepts integer values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154783790
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154783519
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154782915
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
+*/
+  def run(): Unit = 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154782933
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+appId: String,
+maybeLoggingInterval: Option[Long])
+  extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154782904
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -55,14 +63,35 @@ private[spark] object Config extends Logging {
   val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
 
   val KUBERNETES_SERVICE_ACCOUNT_NAME =
-ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
   .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
 "this service account when requesting executor pods from the API 
server. If specific " +
 "credentials are given for the driver pod to use, the driver will 
favor " +
 "using those credentials instead.")
   .stringConf
   .createOptional
 
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154759879
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
+
+val driverCustomAnnotations = ConfigurationUtils
+  .parsePrefixedKeyValuePairs(
+submissionSparkConf,
+KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+  s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as 
it is reserved for" +
+" Spark bookkeeping operations.")
+
+val driverCustomEnvs = 
submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+  .map(env =>
--- End diff --

`.map { env =>`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154749355
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
--- End diff --

Is it a big deal to not add this as a command line arg and force people to 
use the configuration instead? I'd prefer to not add even more cluster-specific 
switches to `SparkSubmit`, at least not until it is refactored to be pluggable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154759045
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
--- End diff --

Why use the full path all over the place instead of importing?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154757417
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+appId: String,
+maybeLoggingInterval: Option[Long])
+  extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
--- End diff --

You could use `.checkValue` in the constant declaration instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154755221
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.steps._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.util.SystemClock
+
+/**
+ * Constructs the complete list of driver configuration steps to run to 
deploy the Spark driver.
+ */
+private[spark] class DriverConfigurationStepsOrchestrator(
+namespace: String,
+kubernetesAppId: String,
+launchTime: Long,
+mainAppResource: Option[MainAppResource],
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) {
+
+  // The resource name prefix is derived from the application name, making 
it easy to connect the
+  // names of the Kubernetes resources from e.g. kubectl or the Kubernetes 
dashboard to the
+  // application the user submitted. However, we can't use the application 
name in the label, as
+  // label values are considerably restrictive, e.g. must be no longer 
than 63 characters in
+  // length. So we generate a separate identifier for the app ID itself, 
and bookkeeping that
+  // requires finding "all pods for this application" should use the 
kubernetesAppId.
+  private val kubernetesResourceNamePrefix =
--- End diff --

Is this required to be unique? If so, this could use some more uniqueness 
(e.g. using a UUID instead of the "launch time").


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154771100
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154770325
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154754517
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
+*/
+  def run(): Unit = {
   

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154770165
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154750457
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -55,14 +63,35 @@ private[spark] object Config extends Logging {
   val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
 
   val KUBERNETES_SERVICE_ACCOUNT_NAME =
-ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
   .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
 "this service account when requesting executor pods from the API 
server. If specific " +
 "credentials are given for the driver pod to use, the driver will 
favor " +
 "using those credentials instead.")
   .stringConf
   .createOptional
 
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
--- End diff --

s/a single/each


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154771178
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154759574
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
--- End diff --

Another config that could use some re-factoring so that YARN and k8s use 
the same logic.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154759746
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
--- End diff --

Fits in the previous line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154759371
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = submissionSparkConf
+.get(DRIVER_DOCKER_IMAGE)
+.getOrElse(throw new SparkException("Must specify the driver Docker 
image"))
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
--- End diff --

You could move the `DRIVER_CORES` config from the YARN module to core.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154750025
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala ---
@@ -388,6 +388,32 @@ class SparkSubmitSuite
 conf.get("spark.ui.enabled") should be ("false")
   }
 
+  test("handles k8s cluster mode") {
+val clArgs = Seq(
+  "--deploy-mode", "cluster",
+  "--master", "k8s://host:port",
+  "--executor-memory", "5g",
+  "--class", "org.SomeClass",
+  "--kubernetes-namespace", "foo",
+  "--driver-memory", "4g",
+  "--conf", "spark.kubernetes.driver.docker.image=bar",
+  "/home/thejar.jar",
+  "arg1")
+val appArgs = new SparkSubmitArguments(clArgs)
+val (childArgs, classpath, conf, mainClass) = 
prepareSubmitEnvironment(appArgs)
+
+val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap
+childArgsMap.get("--primary-java-resource") should be 
(Some("file:/home/thejar.jar"))
+childArgsMap.get("--main-class") should be (Some("org.SomeClass"))
+childArgsMap.get("--arg") should be (Some("arg1"))
+mainClass should be ("org.apache.spark.deploy.k8s.submit.Client")
+classpath should have length (0)
+conf.get("spark.executor.memory") should be ("5g")
--- End diff --

Check `spark.master` too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154771774
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
--- End diff --

Not really familiar with how these things are used  by k8s, but don't these 
certs generally have passwords? I can't seem to find anything related to 
passwords for these things.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154751082
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
--- End diff --

The doc string says "download jars to". Is it guaranteed that this 
directory will be writable? Generally only root can write to things in "/var" 
by default, and I assume you're not running things as root even if it's inside 
a containers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154756363
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala
 ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.io.File
+
+import org.apache.spark.util.Utils
+
+private[spark] object KubernetesFileUtils {
+
+  /**
+   * For the given collection of file URIs, resolves them as follows:
+   * - File URIs with scheme file:// are resolved to the given download 
path.
+   * - File URIs with scheme local:// resolve to just the path of the URI.
+   * - Otherwise, the URIs are returned as-is.
+   */
+  def resolveSubmittedUris(
+  fileUris: Iterable[String],
+  fileDownloadPath: String): Iterable[String] = {
+fileUris.map { uri =>
+  val fileUri = Utils.resolveURI(uri)
+  val fileScheme = Option(fileUri.getScheme).getOrElse("file")
+  fileScheme match {
--- End diff --

This looks a lot like `resolveFilePaths`, you could probably merge the two 
in some way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154746008
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154726920
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
--- End diff --

Seems openjdk is under GPL2. Should they be listed following `This product 
optionally depends on 'Webbit'` in a similar format and with the license files 
included under `license`?  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154724747
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
+
+RUN apk upgrade --no-cache && \
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154721032
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+appId: String,
+maybeLoggingInterval: Option[Long])
+  extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
+  scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
+}
+  }
+
+  override def eventReceived(action: Action, pod: Pod): Unit = {
+this.pod = Option(pod)
+action match {
+  case Action.DELETED | Action.ERROR =>
+closeWatch()
+
+  case _ =>
+logLongStatus()
+if (hasCompleted()) {
+  closeWatch()
+}
+}
+  }
+
+  override def onClose(e: KubernetesClientException): Unit = {
+logDebug(s"Stopping watching application $appId with last-observed 
phase $phase")
+closeWatch()
+  }
+
+  private def logShortStatus() = {
+logInfo(s"Application status for $appId (phase: $phase)")
+  }
+
+  private def logLongStatus() = {
+logInfo("State changed, new state: " + 
pod.map(formatPodState).getOrElse("unknown"))
+  }
+
+  private def hasCompleted(): Boolean = {
+phase == "Succeeded" || phase == "Failed"
+  }
+
+  private def closeWatch(): Unit = {
+podCompletedFuture.countDown()
+scheduler.shutdown()
+  }
+
+  private def formatPodState(pod: Pod): String = {
+// TODO include specific container state
--- End diff --

Actually it already includes the `containerStatuses`. Removed this TODO.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154719522
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154707798
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154707001
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
+
+  private[spark] val FILES_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
+  .doc("Location to download files to in the driver and executors. 
When using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pods.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-files")
+
+  val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
+"spark.kubernetes.authenticate.submission"
+
   val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
+
+  val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
+  val KUBERNETES_DRIVER_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.annotation."
+
+  val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
+  val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = 
"spark.kubernetes.executor.annotation."
+
+  val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
+
+  def getK8sMasterUrl(rawMasterString: String): String = {
+require(rawMasterString.startsWith("k8s://"),
+  "Master URL should start with k8s:// in Kubernetes mode.")
+val masterWithoutK8sPrefix = rawMasterString.substring("k8s://".length)
+if (masterWithoutK8sPrefix.startsWith("http://;)
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-04 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154694277
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind " +
+  "address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be " +
+  "managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). 
Falling back to use " +
+s"$shorterServiceName as the driver service's name.")
+  shorterServiceName
+}
+
+val driverPort = submissionSparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+val driverBlockManagerPort = submissionSparkConf.getInt(
+org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+val driverService = new ServiceBuilder()
+  .withNewMetadata()
+.withName(resolvedServiceName)
+.endMetadata()
+  .withNewSpec()
+.withClusterIP("None")
+.withSelector(driverLabels.asJava)
+.addNewPort()
+  .withName(DRIVER_PORT_NAME)
+  .withPort(driverPort)
+  .withNewTargetPort(driverPort)
+  .endPort()
+.addNewPort()
+  .withName(BLOCK_MANAGER_PORT_NAME)
+  .withPort(driverBlockManagerPort)
+  .withNewTargetPort(driverBlockManagerPort)
+  .endPort()
+.endSpec()
+  .build()
+
+val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+  .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, 
driverHostname)
--- End diff --

Ah, yeah, done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154572754
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind " +
+  "address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be " +
+  "managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). 
Falling back to use " +
+s"$shorterServiceName as the driver service's name.")
+  shorterServiceName
+}
+
+val driverPort = submissionSparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+val driverBlockManagerPort = submissionSparkConf.getInt(
+org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+val driverService = new ServiceBuilder()
+  .withNewMetadata()
+.withName(resolvedServiceName)
+.endMetadata()
+  .withNewSpec()
+.withClusterIP("None")
+.withSelector(driverLabels.asJava)
+.addNewPort()
+  .withName(DRIVER_PORT_NAME)
+  .withPort(driverPort)
+  .withNewTargetPort(driverPort)
+  .endPort()
+.addNewPort()
+  .withName(BLOCK_MANAGER_PORT_NAME)
+  .withPort(driverBlockManagerPort)
+  .withNewTargetPort(driverBlockManagerPort)
+  .endPort()
+.endSpec()
+  .build()
+
+val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+  .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, 
driverHostname)
--- End diff --

I meant isn't `org.apache.spark.internal.config.DRIVER_HOST_ADDRESS` just 
`DRIVER_HOST_KEY`?


---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154569020
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
 ---
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# echo commands to the terminal output
+set -x
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154568773
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +604,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

Actually k8s does not yet support `kill` and `status`, nor does it support 
`spark.cores.max` yet. Updated `validateKillArguments` and 
`validateStatusRequestArguments`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154567885
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

I think if we eventually decide to not have default docker images, we 
should make the options  `--param` ones. I'm not sure if we want to make a call 
and do that in this PR though. Can we defer this to a later time when we are 
clearer on how we publish and maintain the images?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154567054
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS
--- End diff --

I think this is debatable. They have some env variables with similar 
purposes, but they also have role-specific arguments/properties that probably 
make sense to warrant separate images.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566501
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
+
+  private[spark] val FILES_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
+  .doc("Location to download files to in the driver and executors. 
When using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pods.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-files")
--- End diff --

See comment above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566357
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566070
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile 
.
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" 
-Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY 
-Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
$SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores 
$SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname 
$SPARK_EXECUTOR_POD_IP
--- End diff --

`SPARK_EXECUTOR_PORT` is no longer set as `spark.executor.port` is no 
longer used by Spark. Removed `SPARK_EXECUTOR_PORT`. `SPARK_MOUNTED_CLASSPATH` 
is set in `DependencyResolutionStep`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154566094
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind " +
+  "address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be " +
+  "managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). 
Falling back to use " +
+s"$shorterServiceName as the driver service's name.")
+  shorterServiceName
+}
+
+val driverPort = submissionSparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+val driverBlockManagerPort = submissionSparkConf.getInt(
+org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+val driverService = new ServiceBuilder()
+  .withNewMetadata()
+.withName(resolvedServiceName)
+.endMetadata()
+  .withNewSpec()
+.withClusterIP("None")
+.withSelector(driverLabels.asJava)
+.addNewPort()
+  .withName(DRIVER_PORT_NAME)
+  .withPort(driverPort)
+  .withNewTargetPort(driverPort)
+  .endPort()
+.addNewPort()
+  .withName(BLOCK_MANAGER_PORT_NAME)
+  .withPort(driverBlockManagerPort)
+  .withNewTargetPort(driverBlockManagerPort)
+  .endPort()
+.endSpec()
+  .build()
+
+val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+  .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, 
driverHostname)
--- End diff --

What do you meant here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154565597
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   case (STANDALONE, CLUSTER) if args.isR =>
 printErrorAndExit("Cluster deploy mode is currently not supported 
for R " +
   "applications on standalone clusters.")
+  case (KUBERNETES, CLIENT) =>
+printErrorAndExit("Client mode is currently not supported for 
Kubernetes.")
+  case (KUBERNETES, _) if args.isPython =>
+printErrorAndExit("Python applications are currently not supported 
for Kubernetes.")
+  case (KUBERNETES, _) if args.isR =>
+printErrorAndExit("R applications are currently not supported for 
Kubernetes.")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154564066
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
 YARN
   case m if m.startsWith("spark") => STANDALONE
   case m if m.startsWith("mesos") => MESOS
+  case m if m.startsWith("k8s") => KUBERNETES
   case m if m.startsWith("local") => LOCAL
   case _ =>
 printErrorAndExit("Master must either be yarn or start with spark, 
mesos, local")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154554648
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.ServiceBuilder
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Clock
+
+/**
+ * Allows the driver to be reachable by executor pods through a headless 
service. The service's
+ * ports should correspond to the ports that the executor will reach the 
pod at for RPC.
+ */
+private[spark] class DriverServiceBootstrapStep(
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+submissionSparkConf: SparkConf,
+clock: Clock) extends DriverConfigurationStep with Logging {
+  import DriverServiceBootstrapStep._
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty,
+  s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as 
the driver's bind " +
+  "address is managed and set to the driver pod's IP address.")
+require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty,
+  s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the 
driver's hostname will be " +
+  "managed via a Kubernetes service.")
+
+val preferredServiceName = 
s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX"
+val resolvedServiceName = if (preferredServiceName.length <= 
MAX_SERVICE_NAME_LENGTH) {
+  preferredServiceName
+} else {
+  val randomServiceId = clock.getTimeMillis()
+  val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX"
+  logWarning(s"Driver's hostname would preferably be 
$preferredServiceName, but this is " +
+s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). 
Falling back to use " +
+s"$shorterServiceName as the driver service's name.")
+  shorterServiceName
+}
+
+val driverPort = submissionSparkConf.getInt("spark.driver.port", 
DEFAULT_DRIVER_PORT)
+val driverBlockManagerPort = submissionSparkConf.getInt(
+org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, 
DEFAULT_BLOCKMANAGER_PORT)
+val driverService = new ServiceBuilder()
+  .withNewMetadata()
+.withName(resolvedServiceName)
+.endMetadata()
+  .withNewSpec()
+.withClusterIP("None")
+.withSelector(driverLabels.asJava)
+.addNewPort()
+  .withName(DRIVER_PORT_NAME)
+  .withPort(driverPort)
+  .withNewTargetPort(driverPort)
+  .endPort()
+.addNewPort()
+  .withName(BLOCK_MANAGER_PORT_NAME)
+  .withPort(driverBlockManagerPort)
+  .withNewTargetPort(driverBlockManagerPort)
+  .endPort()
+.endSpec()
+  .build()
+
+val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE)
+val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val resolvedSparkConf = driverSpec.driverSparkConf.clone()
+  .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, 
driverHostname)
--- End diff --

`DRIVER_HOST_KEY`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154549074
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
 YARN
   case m if m.startsWith("spark") => STANDALONE
   case m if m.startsWith("mesos") => MESOS
+  case m if m.startsWith("k8s") => KUBERNETES
   case m if m.startsWith("local") => LOCAL
   case _ =>
 printErrorAndExit("Master must either be yarn or start with spark, 
mesos, local")
--- End diff --

`Master must either be yarn or start with spark, mesos, k8s, local`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154549172
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   case (STANDALONE, CLUSTER) if args.isR =>
 printErrorAndExit("Cluster deploy mode is currently not supported 
for R " +
   "applications on standalone clusters.")
+  case (KUBERNETES, CLIENT) =>
+printErrorAndExit("Client mode is currently not supported for 
Kubernetes.")
+  case (KUBERNETES, _) if args.isPython =>
+printErrorAndExit("Python applications are currently not supported 
for Kubernetes.")
+  case (KUBERNETES, _) if args.isR =>
+printErrorAndExit("R applications are currently not supported for 
Kubernetes.")
--- End diff --

nit: Not affect the result, but logically I think it is better:

```scala
case (KUBERNETES, _) if args.isPython =>
  printErrorAndExit("Python applications are currently not supported for 
Kubernetes.")
case (KUBERNETES, _) if args.isR =>
  printErrorAndExit("R applications are currently not supported for 
Kubernetes.")
case (KUBERNETES, CLIENT) =>
  printErrorAndExit("Client mode is currently not supported for 
Kubernetes.")
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154549733
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +604,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

There are some messages needed to be updated too, e.g,:

| Spark standalone or Mesos with cluster deploy mode only:
|  --supervise If given, restarts the driver on 
failure.
|  --kill SUBMISSION_IDIf given, kills the driver specified.
|  --status SUBMISSION_ID  If given, requests the status of the 
driver specified.

From above, k8s supports killing submission and requesting submission 
statuses.

| Spark standalone and Mesos only:
|  --total-executor-cores NUM  Total cores for all executors.

k8s also supports `totalExecutorCores` option.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154553669
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile 
.
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" 
-Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY 
-Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" 
org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url 
$SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores 
$SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname 
$SPARK_EXECUTOR_POD_IP
--- End diff --

For `SPARK_EXECUTOR_PORT`, I don't see the corresponding 
`ENV_EXECUTOR_PORT` has been set in `createExecutorPod`.  Is it missing?

ditto for`SPARK_MOUNTED_CLASSPATH`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154537698
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -304,7 +313,9 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
   }
 
   private def validateStatusRequestArguments(): Unit = {
-if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
+if (!master.startsWith("spark://")
+  && !master.startsWith("mesos://")
+  && !master.startsWith("k8s://")) {
   SparkSubmit.printErrorAndExit(
 "Requesting submission statuses is only supported in standalone or 
Mesos mode!")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154537691
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -294,7 +301,9 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
   }
 
   private def validateKillArguments(): Unit = {
-if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
+if (!master.startsWith("spark://")
+  && !master.startsWith("mesos://")
+  && !master.startsWith("k8s://")) {
   SparkSubmit.printErrorAndExit(
 "Killing submissions is only supported in standalone or Mesos 
mode!")
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154537632
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+private[spark] class DependencyResolutionStepSuite extends SparkFunSuite {
--- End diff --

Done.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-03 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154537561
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
--- End diff --

This is somehow implementation details and we expect normally users 
wouldn't need to set or even know about it, so having a default makes sense.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154502824
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
--- End diff --

I tracked down the parentage of this dependency tree and their licenses,
openjdk: MIT
alpine: BSD2
so we are ok.

might need to list them in NOTICE though


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503544
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
--- End diff --

there is one more step before this, I think, it's to build spark first.
could you add a link to  
http://spark.apache.org/docs/latest/building-spark.html



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503161
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503411
  
--- Diff: 
resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, 
PodBuilder}
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+private[spark] class DependencyResolutionStepSuite extends SparkFunSuite {
--- End diff --

don't `private[spark]`
could you double check jenkins' run


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503100
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.util.concurrent.TimeUnit
+
+import org.apache.spark.{SPARK_VERSION => sparkVersion}
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.network.util.ByteUnit
+
+private[spark] object Config extends Logging {
+
+  val KUBERNETES_NAMESPACE =
+ConfigBuilder("spark.kubernetes.namespace")
+  .doc("The namespace that will be used for running the driver and 
executor pods. When using " +
+"spark-submit in cluster mode, this can also be passed to 
spark-submit via the " +
+"--kubernetes-namespace command line argument.")
+  .stringConf
+  .createWithDefault("default")
+
+  val DRIVER_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.driver.docker.image")
+  .doc("Docker image to use for the driver. Specify this using the 
standard Docker tag format.")
+  .stringConf
+  .createWithDefault(s"spark-driver:$sparkVersion")
+
+  val EXECUTOR_DOCKER_IMAGE =
+ConfigBuilder("spark.kubernetes.executor.docker.image")
+  .doc("Docker image to use for the executors. Specify this using the 
standard Docker tag " +
+"format.")
+  .stringConf
+  .createWithDefault(s"spark-executor:$sparkVersion")
+
+  val DOCKER_IMAGE_PULL_POLICY =
+ConfigBuilder("spark.kubernetes.docker.image.pullPolicy")
+  .doc("Docker image pull policy when pulling any docker image in 
Kubernetes integration")
+  .stringConf
+  .createWithDefault("IfNotPresent")
+
+
+  val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver"
+  val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX =
+  "spark.kubernetes.authenticate.driver.mounted"
+  val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken"
+  val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile"
+  val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile"
+  val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile"
+  val CA_CERT_FILE_CONF_SUFFIX = "caCertFile"
+
+  val KUBERNETES_SERVICE_ACCOUNT_NAME =
+
ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName")
+  .doc("Service account that is used when running the driver pod. The 
driver pod uses " +
+"this service account when requesting executor pods from the API 
server. If specific " +
+"credentials are given for the driver pod to use, the driver will 
favor " +
+"using those credentials instead.")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.driver.limit.cores")
+  .doc("Specify the hard cpu limit for the driver pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_EXECUTOR_LIMIT_CORES =
+ConfigBuilder("spark.kubernetes.executor.limit.cores")
+  .doc("Specify the hard cpu limit for a single executor pod")
+  .stringConf
+  .createOptional
+
+  val KUBERNETES_DRIVER_MEMORY_OVERHEAD =
+ConfigBuilder("spark.kubernetes.driver.memoryOverhead")
+  .doc("The amount of off-heap memory (in megabytes) to be allocated 
for the driver and the " +
+"driver submission server. This is memory that accounts for things 
like VM overheads, " +
+"interned strings, other native overheads, etc. This tends to grow 
with the driver's " +
+"memory size (typically 6-10%).")
+  .bytesConf(ByteUnit.MiB)
+  .createOptional
+
+  // Note that while we set a default for this when we start up the
+  // scheduler, the specific default value is 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503612
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh
 ---
@@ -0,0 +1,37 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# echo commands to the terminal output
+set -x
--- End diff --

consider `set -ex`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503222
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource if any
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: Option[MainAppResource],
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource,
+  mainClass.get,
+  driverArgs.toArray)
+  }
+}
+
+/**
+ * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
+ * watcher that monitors and logs the application status. Waits for the 
application to terminate if
+ * spark.kubernetes.submission.waitAppCompletion is true.
+ *
+ * @param submissionSteps steps that collectively configure the driver
+ * @param submissionSparkConf the submission client Spark configuration
+ * @param kubernetesClient the client to talk to the Kubernetes API server
+ * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
+ * to complete
+ * @param appName the application name
+ * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
+ */
+private[spark] class Client(
+submissionSteps: Seq[DriverConfigurationStep],
+submissionSparkConf: SparkConf,
+kubernetesClient: KubernetesClient,
+waitForAppCompletion: Boolean,
+appName: String,
+loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
+
+  private val driverJavaOptions = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
+
+   /**
+* Run command that initializes a DriverSpec that will be updated after 
each
+* DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
+* will be used to build the Driver Container, Driver Pod, and 
Kubernetes 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154502636
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -304,7 +313,9 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
   }
 
   private def validateStatusRequestArguments(): Unit = {
-if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
+if (!master.startsWith("spark://")
+  && !master.startsWith("mesos://")
+  && !master.startsWith("k8s://")) {
   SparkSubmit.printErrorAndExit(
 "Requesting submission statuses is only supported in standalone or 
Mesos mode!")
--- End diff --

add `Kubernetes` 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154502901
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala ---
@@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with 
Logging {
   }
 }
 
+if (isKubernetesCluster) {
+  childMainClass = "org.apache.spark.deploy.k8s.submit.Client"
--- End diff --

let's wait for that and rebase this PR


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503509
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile ---
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM spark-base
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile .
+
+COPY examples /opt/spark/examples
+
+CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
+env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > 
/tmp/java_opts.txt && \
+readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \
+if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then 
SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
+${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp 
"$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY 
-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS 
$SPARK_DRIVER_ARGS
--- End diff --

what are the core differences in driver vs executor image, other than the 
`SPARK_DRIVER*` env?
we could consider switching in a docker-entrypoint.sh script like in 
https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/docker-entrypoint.sh
 and have one image instead.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503142
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
+
+  private[spark] val FILES_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
+  .doc("Location to download files to in the driver and executors. 
When using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pods.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-files")
+
+  val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX =
+"spark.kubernetes.authenticate.submission"
+
   val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector."
+
+  val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label."
+  val KUBERNETES_DRIVER_ANNOTATION_PREFIX = 
"spark.kubernetes.driver.annotation."
+
+  val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label."
+  val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = 
"spark.kubernetes.executor.annotation."
+
+  val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv."
+
+  def getK8sMasterUrl(rawMasterString: String): String = {
+require(rawMasterString.startsWith("k8s://"),
+  "Master URL should start with k8s:// in Kubernetes mode.")
+val masterWithoutK8sPrefix = rawMasterString.substring("k8s://".length)
+if (masterWithoutK8sPrefix.startsWith("http://;)
--- End diff --

btw, on a side note, should there be a log/warning for http: (vs https:)? I 
don't think we have that case for other cluster manager


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503317
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+appId: String,
+maybeLoggingInterval: Option[Long])
+  extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
+  scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
+}
+  }
+
+  override def eventReceived(action: Action, pod: Pod): Unit = {
+this.pod = Option(pod)
+action match {
+  case Action.DELETED | Action.ERROR =>
+closeWatch()
+
+  case _ =>
+logLongStatus()
+if (hasCompleted()) {
+  closeWatch()
+}
+}
+  }
+
+  override def onClose(e: KubernetesClientException): Unit = {
+logDebug(s"Stopping watching application $appId with last-observed 
phase $phase")
+closeWatch()
+  }
+
+  private def logShortStatus() = {
+logInfo(s"Application status for $appId (phase: $phase)")
+  }
+
+  private def logLongStatus() = {
+logInfo("State changed, new state: " + 
pod.map(formatPodState).getOrElse("unknown"))
+  }
+
+  private def hasCompleted(): Boolean = {
+phase == "Succeeded" || phase == "Failed"
+  }
+
+  private def closeWatch(): Unit = {
+podCompletedFuture.countDown()
+scheduler.shutdown()
+  }
+
+  private def formatPodState(pod: Pod): String = {
+// TODO include specific container state
--- End diff --

should this be tracked with a JIRA?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154502635
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -294,7 +301,9 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
   }
 
   private def validateKillArguments(): Unit = {
-if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
+if (!master.startsWith("spark://")
+  && !master.startsWith("mesos://")
+  && !master.startsWith("k8s://")) {
   SparkSubmit.printErrorAndExit(
 "Killing submissions is only supported in standalone or Mesos 
mode!")
--- End diff --

add `Kubernetes` to the msg


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503012
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---
@@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: 
Seq[String], env: Map[String, S
 |  the node running the Application 
Master via the Secure
 |  Distributed Cache, for renewing the 
login tickets and the
 |  delegation tokens periodically.
+|
+| Kubernetes only:
+|  --kubernetes-namespace NS   The namespace in the Kubernetes 
cluster within which the
+|  application must be launched. The 
namespace must already
+|  exist in the cluster. (Default: 
default).
--- End diff --

now that docker image is without a default value 
(`spark.kubernetes.*.docker.image`), (yes that discussion is ongoing) I wonder 
if it makes sense to bubble that up as a `--param` for visibility/convenient.

tbh I'm generally against adding `--param` to submit because of the 
potential confusion it can cause, but since we are here and there's a 
`--kubernetes-namespace`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503085
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
--- End diff --

does it make sense to have a default?
perhaps a comment or doc string to explain why this value as default?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503638
  
--- Diff: 
resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile 
---
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+FROM openjdk:8-alpine
+
+# If this docker file is being used in the context of building your images 
from a Spark distribution, the docker build
+# command should be invoked from the top level directory of the Spark 
distribution. E.g.:
+# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile .
+
+RUN apk upgrade --no-cache && \
--- End diff --

consider - add `set -ex`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-12-02 Thread felixcheung
Github user felixcheung commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154503089
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
 ---
@@ -119,5 +139,60 @@ private[spark] object Config extends Logging {
 "must be a positive integer")
   .createWithDefault(10)
 
+  val WAIT_FOR_APP_COMPLETION =
+ConfigBuilder("spark.kubernetes.submission.waitAppCompletion")
+  .doc("In cluster mode, whether to wait for the application to finish 
before exiting the " +
+"launcher process.")
+  .booleanConf
+  .createWithDefault(true)
+
+  val REPORT_INTERVAL =
+ConfigBuilder("spark.kubernetes.report.interval")
+  .doc("Interval between reports of the current app status in cluster 
mode.")
+  .timeConf(TimeUnit.MILLISECONDS)
+  .createWithDefaultString("1s")
+
+  private[spark] val JARS_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir")
+  .doc("Location to download jars to in the driver and executors. When 
using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pod.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-jars")
+
+  private[spark] val FILES_DOWNLOAD_LOCATION =
+ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir")
+  .doc("Location to download files to in the driver and executors. 
When using" +
+" spark-submit, this directory must be empty and will be mounted 
as an empty directory" +
+" volume on the driver and executor pods.")
+  .stringConf
+  .createWithDefault("/var/spark-data/spark-files")
--- End diff --

ditto


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154168292
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala
 ---
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, 
EnvVarSourceBuilder, PodBuilder, QuantityBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.ConfigurationUtils
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Represents the initial setup required for the driver.
+ */
+private[spark] class BaseDriverConfigurationStep(
+kubernetesAppId: String,
+kubernetesResourceNamePrefix: String,
+driverLabels: Map[String, String],
+dockerImagePullPolicy: String,
+appName: String,
+mainClass: String,
+appArgs: Array[String],
+submissionSparkConf: SparkConf) extends DriverConfigurationStep {
+
+  private val kubernetesDriverPodName = 
submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME)
+.getOrElse(s"$kubernetesResourceNamePrefix-driver")
+
+  private val driverExtraClasspath = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_CLASS_PATH)
+
+  private val driverDockerImage = 
submissionSparkConf.get(DRIVER_DOCKER_IMAGE)
+
+  // CPU settings
+  private val driverCpuCores = 
submissionSparkConf.getOption("spark.driver.cores").getOrElse("1")
+  private val driverLimitCores = 
submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
+
+  // Memory settings
+  private val driverMemoryMiB = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY)
+  private val driverMemoryString = submissionSparkConf.get(
+org.apache.spark.internal.config.DRIVER_MEMORY.key,
+org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString)
+  private val memoryOverheadMiB = submissionSparkConf
+.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD)
+.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt,
+  MEMORY_OVERHEAD_MIN_MIB))
+  private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + 
memoryOverheadMiB
+
+  override def configureDriver(
+  driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
+val driverExtraClasspathEnv = driverExtraClasspath.map { classPath =>
+  new EnvVarBuilder()
+.withName(ENV_SUBMIT_EXTRA_CLASSPATH)
+.withValue(classPath)
+.build()
+}
+
+val driverCustomAnnotations = ConfigurationUtils
+  .parsePrefixedKeyValuePairs(
+submissionSparkConf,
+KUBERNETES_DRIVER_ANNOTATION_PREFIX)
+require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION),
+  s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as 
it is reserved for" +
+" Spark bookkeeping operations.")
+
+val driverCustomEnvs = 
submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq
+  .map(env => new EnvVarBuilder()
+.withName(env._1)
+.withValue(env._2)
+.build())
+
+val allDriverAnnotations = driverCustomAnnotations ++ 
Map(SPARK_APP_NAME_ANNOTATION -> appName)
+
+val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs(
+  submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)
+
+val driverCpuQuantity = new QuantityBuilder(false)
+  .withAmount(driverCpuCores)
+  .build()
+val driverMemoryQuantity = new QuantityBuilder(false)
+  .withAmount(s"${driverMemoryMiB}Mi")
+  .build()
+val 

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154158221
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
--- End diff --

Also added the missing step `DependencyResolutionStep` that is essential 
for supporting `local://` dependencies (jars and files).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154153331
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala
 ---
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model.{ContainerStateRunning, 
ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+
+import org.apache.spark.SparkException
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.ThreadUtils
+
+private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] {
+  def awaitCompletion(): Unit
+}
+
+/**
+ * A monitor for the running Kubernetes pod of a Spark application. Status 
logging occurs on
+ * every state change and also at an interval for liveness.
+ *
+ * @param appId application ID.
+ * @param maybeLoggingInterval ms between each state request. If provided, 
must be a positive
+ * number.
+ */
+private[k8s] class LoggingPodStatusWatcherImpl(
+  appId: String, maybeLoggingInterval: Option[Long])
+extends LoggingPodStatusWatcher with Logging {
+
+  private val podCompletedFuture = new CountDownLatch(1)
+  // start timer for periodic logging
+  private val scheduler =
+
ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher")
+  private val logRunnable: Runnable = new Runnable {
+override def run() = logShortStatus()
+  }
+
+  private var pod = Option.empty[Pod]
+
+  private def phase: String = 
pod.map(_.getStatus.getPhase).getOrElse("unknown")
+
+  def start(): Unit = {
+maybeLoggingInterval.foreach { interval =>
+  require(interval > 0, s"Logging interval must be a positive time 
value, got: $interval ms.")
+  scheduler.scheduleAtFixedRate(logRunnable, 0, interval, 
TimeUnit.MILLISECONDS)
+}
+  }
+
+  override def eventReceived(action: Action, pod: Pod): Unit = {
+this.pod = Option(pod)
+action match {
+  case Action.DELETED =>
+closeWatch()
+
+  case Action.ERROR =>
+closeWatch()
--- End diff --

It's because in both `DELETE` and `ERROR` cases, the application is 
considered being terminated. In other cases, we need to check the phase of the 
driver pod to determine if the application terminated. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154143906
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
--- End diff --

As @mridulm pointed above, `mainAppResource` may be empty in cases of spark 
examples, Thrift server, etc.  `--primary-java-resource` is set only if 
`mainAppResource != SparkLauncher.NO_RESOURCE` in `SparkSubmit`. So it could be 
empty here. I changed `ClientArguments` to take an `Option[MainAppResource]` 
instead.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154134373
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154134483
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread liyinan926
Github user liyinan926 commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154134088
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesClientFactory.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import io.fabric8.kubernetes.client.{ConfigBuilder, 
DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.utils.HttpClientUtils
+import okhttp3.Dispatcher
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus 
common suffixes to
+ * parse configuration keys, similar to the manner in which Spark's 
SecurityManager parses SSL
+ * options for different components.
+ */
+private[spark] object KubernetesClientFactory {
--- End diff --

Ah, it's duplicated. Removed this class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154016042
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesClientFactory.scala
 ---
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import io.fabric8.kubernetes.client.{ConfigBuilder, 
DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.utils.HttpClientUtils
+import okhttp3.Dispatcher
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus 
common suffixes to
+ * parse configuration keys, similar to the manner in which Spark's 
SecurityManager parses SSL
+ * options for different components.
+ */
+private[spark] object KubernetesClientFactory {
--- End diff --

What's the difference between this and `SparkKubernetesClientFactory`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154029412
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154029577
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala
 ---
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit.steps
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+import scala.language.implicitConversions
+
+import com.google.common.io.{BaseEncoding, Files}
+import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, 
Secret, SecretBuilder}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec
+
+/**
+ * Mounts Kubernetes credentials into the driver pod. The driver will use 
such mounted credentials
+ * to request executors.
+ */
+private[spark] class DriverKubernetesCredentialsStep(
+submissionSparkConf: SparkConf,
+kubernetesResourceNamePrefix: String) extends DriverConfigurationStep {
+
+  private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX")
+  private val maybeMountedClientKeyFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX")
+  private val maybeMountedClientCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX")
+  private val maybeMountedCaCertFile = submissionSparkConf.getOption(
+  
s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX")
+  private val driverServiceAccount = 
submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME)
+
+  override def configureDriver(driverSpec: KubernetesDriverSpec): 
KubernetesDriverSpec = {
+val driverSparkConf = driverSpec.driverSparkConf.clone()
+
+val oauthTokenBase64 = submissionSparkConf
+
.getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX")
+.map { token =>
+  
BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8))
+}
+val caCertDataBase64 = safeFileConfToBase64(
+s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX",
+"Driver CA cert file provided at %s does not exist or is not a 
file.")
+val clientKeyDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX",
+"Driver client key file provided at %s does not exist or is not a 
file.")
+val clientCertDataBase64 = safeFileConfToBase64(
+
s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX",
+"Driver client cert file provided at %s does not exist or is not a 
file.")
+
+val driverSparkConfWithCredentialsLocations = 
setDriverPodKubernetesCredentialLocations(
+driverSparkConf,
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val kubernetesCredentialsSecret = createCredentialsSecret(
+oauthTokenBase64,
+caCertDataBase64,
+clientKeyDataBase64,
+clientCertDataBase64)
+
+val driverPodWithMountedKubernetesCredentials = 
kubernetesCredentialsSecret.map { secret =>
+  new PodBuilder(driverSpec.driverPod)
+.editOrNewSpec()
+  .addNewVolume()
+.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME)
+
.withNewSecret().withSecretName(secret.getMetadata.getName).endSecret()
+.endVolume()
+  .endSpec()
+.build()
+}.getOrElse(
+  

[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...

2017-11-30 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/19717#discussion_r154022151
  
--- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.submit
+
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.client.KubernetesClient
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkApplication
+import org.apache.spark.deploy.k8s.Config._
+import org.apache.spark.deploy.k8s.Constants._
+import org.apache.spark.deploy.k8s.KubernetesClientFactory
+import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
+import org.apache.spark.internal.Logging
+import org.apache.spark.util.Utils
+
+/**
+ * Encapsulates arguments to the submission client.
+ *
+ * @param mainAppResource the main application resource
+ * @param mainClass the main class of the application to run
+ * @param driverArgs arguments to the driver
+ */
+private[spark] case class ClientArguments(
+ mainAppResource: MainAppResource,
+ mainClass: String,
+ driverArgs: Array[String])
+
+private[spark] object ClientArguments {
+
+  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
+var mainAppResource: Option[MainAppResource] = None
+var mainClass: Option[String] = None
+val driverArgs = mutable.ArrayBuffer.empty[String]
+
+args.sliding(2, 2).toList.foreach {
+  case Array("--primary-java-resource", primaryJavaResource: String) =>
+mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
+  case Array("--main-class", clazz: String) =>
+mainClass = Some(clazz)
+  case Array("--arg", arg: String) =>
+driverArgs += arg
+  case other =>
+val invalid = other.mkString(" ")
+throw new RuntimeException(s"Unknown arguments: $invalid")
+}
+
+require(mainClass.isDefined, "Main class must be specified via 
--main-class")
+
+ClientArguments(
+  mainAppResource.get,
--- End diff --

Don't we need to check if `mainAppResource` is defined or not before here? 
Otherwise some exception will be thrown if it is not defined.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2