[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154801257 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( --- End diff -- /cc @mccheah. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800789 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800800 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800815 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.File + +import org.apache.spark.util.Utils + +private[spark] object KubernetesFileUtils { + + /** + * For the given collection of file URIs, resolves them as follows: + * - File URIs with scheme file:// are resolved to the given download path. + * - File URIs with scheme local:// resolve to just the path of the URI. + * - Otherwise, the URIs are returned as-is. + */ + def resolveSubmittedUris( + fileUris: Iterable[String], + fileDownloadPath: String): Iterable[String] = { +fileUris.map { uri => + val fileUri = Utils.resolveURI(uri) + val fileScheme = Option(fileUri.getScheme).getOrElse("file") + fileScheme match { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800754 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf --- End diff -- Merged both into `spark.driver.memoryOverhead` and used it in both yarn and k8s. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800774 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => + new EnvVarBuilder() +.withName(ENV_SUBMIT_EXTRA_CLASSPATH) +.withValue(classPath) +.build() +} + +val driverCustomAnnotations = ConfigurationUtils + .parsePrefixedKeyValuePairs( +submissionSparkConf, +KUBERNETES_DRIVER_ANNOTATION_PREFIX) +require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + +" Spark bookkeeping operations.") + +val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + .map(env => --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154800102 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") --- End diff -- Agreed. But it seems `spark.driver.cores` is used in a lot of places. I think it needs a separate PR to union all of them. It also worths pointing out that the value of `spark.driver.cores` is used to set CPU request, and in Kubernetes this can be fractional, e.g., `0.1` or `100m`. `DRIVER_CORES` in yarn, however, only accepts integer values. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154783790 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154783519 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154782915 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes Resources +*/ + def run(): Unit =
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154782933 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. + */ +private[k8s] class LoggingPodStatusWatcherImpl( +appId: String, +maybeLoggingInterval: Option[Long]) + extends LoggingPodStatusWatcher with Logging { + + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { +override def run() = logShortStatus() + } + + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + + def start(): Unit = { +maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154782904 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -55,14 +63,35 @@ private[spark] object Config extends Logging { val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" val KUBERNETES_SERVICE_ACCOUNT_NAME = -ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses " + "this service account when requesting executor pods from the API server. If specific " + "credentials are given for the driver pod to use, the driver will favor " + "using those credentials instead.") .stringConf .createOptional + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759879 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => + new EnvVarBuilder() +.withName(ENV_SUBMIT_EXTRA_CLASSPATH) +.withValue(classPath) +.build() +} + +val driverCustomAnnotations = ConfigurationUtils + .parsePrefixedKeyValuePairs( +submissionSparkConf, +KUBERNETES_DRIVER_ANNOTATION_PREFIX) +require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + +" Spark bookkeeping operations.") + +val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + .map(env => --- End diff -- `.map { env =>` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154749355 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the --- End diff -- Is it a big deal to not add this as a command line arg and force people to use the configuration instead? I'd prefer to not add even more cluster-specific switches to `SparkSubmit`, at least not until it is refactored to be pluggable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759045 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) --- End diff -- Why use the full path all over the place instead of importing? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154757417 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. + */ +private[k8s] class LoggingPodStatusWatcherImpl( +appId: String, +maybeLoggingInterval: Option[Long]) + extends LoggingPodStatusWatcher with Logging { + + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { +override def run() = logShortStatus() + } + + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + + def start(): Unit = { +maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") --- End diff -- You could use `.checkValue` in the constant declaration instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154755221 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/DriverConfigurationStepsOrchestrator.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.steps._ +import org.apache.spark.launcher.SparkLauncher +import org.apache.spark.util.SystemClock + +/** + * Constructs the complete list of driver configuration steps to run to deploy the Spark driver. + */ +private[spark] class DriverConfigurationStepsOrchestrator( +namespace: String, +kubernetesAppId: String, +launchTime: Long, +mainAppResource: Option[MainAppResource], +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) { + + // The resource name prefix is derived from the application name, making it easy to connect the + // names of the Kubernetes resources from e.g. kubectl or the Kubernetes dashboard to the + // application the user submitted. However, we can't use the application name in the label, as + // label values are considerably restrictive, e.g. must be no longer than 63 characters in + // length. So we generate a separate identifier for the app ID itself, and bookkeeping that + // requires finding "all pods for this application" should use the kubernetesAppId. + private val kubernetesResourceNamePrefix = --- End diff -- Is this required to be unique? If so, this could use some more uniqueness (e.g. using a UUID instead of the "launch time"). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154771100 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154770325 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154754517 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes Resources +*/ + def run(): Unit = {
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154770165 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154750457 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -55,14 +63,35 @@ private[spark] object Config extends Logging { val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" val KUBERNETES_SERVICE_ACCOUNT_NAME = -ConfigBuilder(s"$APISERVER_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") .doc("Service account that is used when running the driver pod. The driver pod uses " + "this service account when requesting executor pods from the API server. If specific " + "credentials are given for the driver pod to use, the driver will favor " + "using those credentials instead.") .stringConf .createOptional + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") --- End diff -- s/a single/each --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154771178 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759574 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf --- End diff -- Another config that could use some re-factoring so that YARN and k8s use the same logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759746 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { --- End diff -- Fits in the previous line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154759371 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf +.get(DRIVER_DOCKER_IMAGE) +.getOrElse(throw new SparkException("Must specify the driver Docker image")) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") --- End diff -- You could move the `DRIVER_CORES` config from the YARN module to core. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154750025 --- Diff: core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala --- @@ -388,6 +388,32 @@ class SparkSubmitSuite conf.get("spark.ui.enabled") should be ("false") } + test("handles k8s cluster mode") { +val clArgs = Seq( + "--deploy-mode", "cluster", + "--master", "k8s://host:port", + "--executor-memory", "5g", + "--class", "org.SomeClass", + "--kubernetes-namespace", "foo", + "--driver-memory", "4g", + "--conf", "spark.kubernetes.driver.docker.image=bar", + "/home/thejar.jar", + "arg1") +val appArgs = new SparkSubmitArguments(clArgs) +val (childArgs, classpath, conf, mainClass) = prepareSubmitEnvironment(appArgs) + +val childArgsMap = childArgs.grouped(2).map(a => a(0) -> a(1)).toMap +childArgsMap.get("--primary-java-resource") should be (Some("file:/home/thejar.jar")) +childArgsMap.get("--main-class") should be (Some("org.SomeClass")) +childArgsMap.get("--arg") should be (Some("arg1")) +mainClass should be ("org.apache.spark.deploy.k8s.submit.Client") +classpath should have length (0) +conf.get("spark.executor.memory") should be ("5g") --- End diff -- Check `spark.master` too? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154771774 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( --- End diff -- Not really familiar with how these things are used by k8s, but don't these certs generally have passwords? I can't seem to find anything related to passwords for these things. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154751082 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") --- End diff -- The doc string says "download jars to". Is it guaranteed that this directory will be writable? Generally only root can write to things in "/var" by default, and I assume you're not running things as root even if it's inside a containers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154756363 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesFileUtils.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.io.File + +import org.apache.spark.util.Utils + +private[spark] object KubernetesFileUtils { + + /** + * For the given collection of file URIs, resolves them as follows: + * - File URIs with scheme file:// are resolved to the given download path. + * - File URIs with scheme local:// resolve to just the path of the URI. + * - Otherwise, the URIs are returned as-is. + */ + def resolveSubmittedUris( + fileUris: Iterable[String], + fileDownloadPath: String): Iterable[String] = { +fileUris.map { uri => + val fileUri = Utils.resolveURI(uri) + val fileScheme = Option(fileUri.getScheme).getOrElse("file") + fileScheme match { --- End diff -- This looks a lot like `resolveFilePaths`, you could probably merge the two in some way. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154746008 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +if (isKubernetesCluster) { + childMainClass = "org.apache.spark.deploy.k8s.submit.Client" --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154726920 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile --- @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine --- End diff -- Seems openjdk is under GPL2. Should they be listed following `This product optionally depends on 'Webbit'` in a similar format and with the license files included under `license`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154724747 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile --- @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile . + +RUN apk upgrade --no-cache && \ --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154721032 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. + */ +private[k8s] class LoggingPodStatusWatcherImpl( +appId: String, +maybeLoggingInterval: Option[Long]) + extends LoggingPodStatusWatcher with Logging { + + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { +override def run() = logShortStatus() + } + + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + + def start(): Unit = { +maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") + scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) +} + } + + override def eventReceived(action: Action, pod: Pod): Unit = { +this.pod = Option(pod) +action match { + case Action.DELETED | Action.ERROR => +closeWatch() + + case _ => +logLongStatus() +if (hasCompleted()) { + closeWatch() +} +} + } + + override def onClose(e: KubernetesClientException): Unit = { +logDebug(s"Stopping watching application $appId with last-observed phase $phase") +closeWatch() + } + + private def logShortStatus() = { +logInfo(s"Application status for $appId (phase: $phase)") + } + + private def logLongStatus() = { +logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown")) + } + + private def hasCompleted(): Boolean = { +phase == "Succeeded" || phase == "Failed" + } + + private def closeWatch(): Unit = { +podCompletedFuture.countDown() +scheduler.shutdown() + } + + private def formatPodState(pod: Pod): String = { +// TODO include specific container state --- End diff -- Actually it already includes the `containerStatuses`. Removed this TODO. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154719522 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile --- @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile . --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154707798 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.util.concurrent.TimeUnit + +import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = +ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using " + +"spark-submit in cluster mode, this can also be passed to spark-submit via the " + +"--kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val DRIVER_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.driver.docker.image") + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") + .stringConf + .createWithDefault(s"spark-driver:$sparkVersion") + + val EXECUTOR_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag " + +"format.") + .stringConf + .createWithDefault(s"spark-executor:$sparkVersion") + + val DOCKER_IMAGE_PULL_POLICY = +ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") + .doc("Docker image pull policy when pulling any docker image in Kubernetes integration") + .stringConf + .createWithDefault("IfNotPresent") + + + val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = + "spark.kubernetes.authenticate.driver" + val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" + val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + + val KUBERNETES_SERVICE_ACCOUNT_NAME = + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses " + +"this service account when requesting executor pods from the API server. If specific " + +"credentials are given for the driver pod to use, the driver will favor " + +"using those credentials instead.") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_MEMORY_OVERHEAD = +ConfigBuilder("spark.kubernetes.driver.memoryOverhead") + .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the " + +"driver submission server. This is memory that accounts for things like VM overheads, " + +"interned strings, other native overheads, etc. This tends to grow with the driver's " + +"memory size (typically 6-10%).") + .bytesConf(ByteUnit.MiB) + .createOptional + + // Note that while we set a default for this when we start up the + // scheduler, the specific default value is
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154707001 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") + + private[spark] val FILES_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir") + .doc("Location to download files to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pods.") + .stringConf + .createWithDefault("/var/spark-data/spark-files") + + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = +"spark.kubernetes.authenticate.submission" + val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." + + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." + val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." + + val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." + val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." + + val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." + + def getK8sMasterUrl(rawMasterString: String): String = { +require(rawMasterString.startsWith("k8s://"), + "Master URL should start with k8s:// in Kubernetes mode.") +val masterWithoutK8sPrefix = rawMasterString.substring("k8s://".length) +if (masterWithoutK8sPrefix.startsWith("http://;) --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154694277 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ServiceBuilder + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Allows the driver to be reachable by executor pods through a headless service. The service's + * ports should correspond to the ports that the executor will reach the pod at for RPC. + */ +private[spark] class DriverServiceBootstrapStep( +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +submissionSparkConf: SparkConf, +clock: Clock) extends DriverConfigurationStep with Logging { + import DriverServiceBootstrapStep._ + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " + + "address is managed and set to the driver pod's IP address.") +require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + + "managed via a Kubernetes service.") + +val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" +val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName +} else { + val randomServiceId = clock.getTimeMillis() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + +s"$shorterServiceName as the driver service's name.") + shorterServiceName +} + +val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) +val driverBlockManagerPort = submissionSparkConf.getInt( +org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) +val driverService = new ServiceBuilder() + .withNewMetadata() +.withName(resolvedServiceName) +.endMetadata() + .withNewSpec() +.withClusterIP("None") +.withSelector(driverLabels.asJava) +.addNewPort() + .withName(DRIVER_PORT_NAME) + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() +.addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withPort(driverBlockManagerPort) + .withNewTargetPort(driverBlockManagerPort) + .endPort() +.endSpec() + .build() + +val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) --- End diff -- Ah, yeah, done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154572754 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ServiceBuilder + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Allows the driver to be reachable by executor pods through a headless service. The service's + * ports should correspond to the ports that the executor will reach the pod at for RPC. + */ +private[spark] class DriverServiceBootstrapStep( +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +submissionSparkConf: SparkConf, +clock: Clock) extends DriverConfigurationStep with Logging { + import DriverServiceBootstrapStep._ + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " + + "address is managed and set to the driver pod's IP address.") +require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + + "managed via a Kubernetes service.") + +val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" +val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName +} else { + val randomServiceId = clock.getTimeMillis() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + +s"$shorterServiceName as the driver service's name.") + shorterServiceName +} + +val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) +val driverBlockManagerPort = submissionSparkConf.getInt( +org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) +val driverService = new ServiceBuilder() + .withNewMetadata() +.withName(resolvedServiceName) +.endMetadata() + .withNewSpec() +.withClusterIP("None") +.withSelector(driverLabels.asJava) +.addNewPort() + .withName(DRIVER_PORT_NAME) + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() +.addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withPort(driverBlockManagerPort) + .withNewTargetPort(driverBlockManagerPort) + .endPort() +.endSpec() + .build() + +val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) --- End diff -- I meant isn't `org.apache.spark.internal.config.DRIVER_HOST_ADDRESS` just `DRIVER_HOST_KEY`? --- - To unsubscribe, e-mail:
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154569020 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh --- @@ -0,0 +1,37 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# echo commands to the terminal output +set -x --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154568773 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +604,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the +| application must be launched. The namespace must already +| exist in the cluster. (Default: default). --- End diff -- Actually k8s does not yet support `kill` and `status`, nor does it support `spark.cores.max` yet. Updated `validateKillArguments` and `validateStatusRequestArguments`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154567885 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the +| application must be launched. The namespace must already +| exist in the cluster. (Default: default). --- End diff -- I think if we eventually decide to not have default docker images, we should make the options `--param` ones. I'm not sure if we want to make a call and do that in this PR though. Can we defer this to a later time when we are clearer on how we publish and maintain the images? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154567054 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile . + +COPY examples /opt/spark/examples + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ +env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ +readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \ +if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS --- End diff -- I think this is debatable. They have some env variables with similar purposes, but they also have role-specific arguments/properties that probably make sense to warrant separate images. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566501 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") + + private[spark] val FILES_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir") + .doc("Location to download files to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pods.") + .stringConf + .createWithDefault("/var/spark-data/spark-files") --- End diff -- See comment above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566357 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566070 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile . + +COPY examples /opt/spark/examples + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ +env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ +readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ +if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --- End diff -- `SPARK_EXECUTOR_PORT` is no longer set as `spark.executor.port` is no longer used by Spark. Removed `SPARK_EXECUTOR_PORT`. `SPARK_MOUNTED_CLASSPATH` is set in `DependencyResolutionStep`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154566094 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ServiceBuilder + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Allows the driver to be reachable by executor pods through a headless service. The service's + * ports should correspond to the ports that the executor will reach the pod at for RPC. + */ +private[spark] class DriverServiceBootstrapStep( +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +submissionSparkConf: SparkConf, +clock: Clock) extends DriverConfigurationStep with Logging { + import DriverServiceBootstrapStep._ + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " + + "address is managed and set to the driver pod's IP address.") +require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + + "managed via a Kubernetes service.") + +val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" +val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName +} else { + val randomServiceId = clock.getTimeMillis() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + +s"$shorterServiceName as the driver service's name.") + shorterServiceName +} + +val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) +val driverBlockManagerPort = submissionSparkConf.getInt( +org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) +val driverService = new ServiceBuilder() + .withNewMetadata() +.withName(resolvedServiceName) +.endMetadata() + .withNewSpec() +.withClusterIP("None") +.withSelector(driverLabels.asJava) +.addNewPort() + .withName(DRIVER_PORT_NAME) + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() +.addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withPort(driverBlockManagerPort) + .withNewTargetPort(driverBlockManagerPort) + .endPort() +.endSpec() + .build() + +val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) --- End diff -- What do you meant here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154565597 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with Logging { case (STANDALONE, CLUSTER) if args.isR => printErrorAndExit("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") + case (KUBERNETES, CLIENT) => +printErrorAndExit("Client mode is currently not supported for Kubernetes.") + case (KUBERNETES, _) if args.isPython => +printErrorAndExit("Python applications are currently not supported for Kubernetes.") + case (KUBERNETES, _) if args.isR => +printErrorAndExit("R applications are currently not supported for Kubernetes.") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154564066 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with Logging { YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS + case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => printErrorAndExit("Master must either be yarn or start with spark, mesos, local") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154554648 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.ServiceBuilder + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec +import org.apache.spark.internal.Logging +import org.apache.spark.util.Clock + +/** + * Allows the driver to be reachable by executor pods through a headless service. The service's + * ports should correspond to the ports that the executor will reach the pod at for RPC. + */ +private[spark] class DriverServiceBootstrapStep( +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +submissionSparkConf: SparkConf, +clock: Clock) extends DriverConfigurationStep with Logging { + import DriverServiceBootstrapStep._ + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +require(submissionSparkConf.getOption(DRIVER_BIND_ADDRESS_KEY).isEmpty, + s"$DRIVER_BIND_ADDRESS_KEY is not supported in Kubernetes mode, as the driver's bind " + + "address is managed and set to the driver pod's IP address.") +require(submissionSparkConf.getOption(DRIVER_HOST_KEY).isEmpty, + s"$DRIVER_HOST_KEY is not supported in Kubernetes mode, as the driver's hostname will be " + + "managed via a Kubernetes service.") + +val preferredServiceName = s"$kubernetesResourceNamePrefix$DRIVER_SVC_POSTFIX" +val resolvedServiceName = if (preferredServiceName.length <= MAX_SERVICE_NAME_LENGTH) { + preferredServiceName +} else { + val randomServiceId = clock.getTimeMillis() + val shorterServiceName = s"spark-$randomServiceId$DRIVER_SVC_POSTFIX" + logWarning(s"Driver's hostname would preferably be $preferredServiceName, but this is " + +s"too long (must be <= $MAX_SERVICE_NAME_LENGTH characters). Falling back to use " + +s"$shorterServiceName as the driver service's name.") + shorterServiceName +} + +val driverPort = submissionSparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT) +val driverBlockManagerPort = submissionSparkConf.getInt( +org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key, DEFAULT_BLOCKMANAGER_PORT) +val driverService = new ServiceBuilder() + .withNewMetadata() +.withName(resolvedServiceName) +.endMetadata() + .withNewSpec() +.withClusterIP("None") +.withSelector(driverLabels.asJava) +.addNewPort() + .withName(DRIVER_PORT_NAME) + .withPort(driverPort) + .withNewTargetPort(driverPort) + .endPort() +.addNewPort() + .withName(BLOCK_MANAGER_PORT_NAME) + .withPort(driverBlockManagerPort) + .withNewTargetPort(driverBlockManagerPort) + .endPort() +.endSpec() + .build() + +val namespace = submissionSparkConf.get(KUBERNETES_NAMESPACE) +val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local" +val resolvedSparkConf = driverSpec.driverSparkConf.clone() + .set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, driverHostname) --- End diff -- `DRIVER_HOST_KEY`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154549074 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -251,6 +252,7 @@ object SparkSubmit extends CommandLineUtils with Logging { YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS + case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => printErrorAndExit("Master must either be yarn or start with spark, mesos, local") --- End diff -- `Master must either be yarn or start with spark, mesos, k8s, local` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154549172 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -296,6 +298,12 @@ object SparkSubmit extends CommandLineUtils with Logging { case (STANDALONE, CLUSTER) if args.isR => printErrorAndExit("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") + case (KUBERNETES, CLIENT) => +printErrorAndExit("Client mode is currently not supported for Kubernetes.") + case (KUBERNETES, _) if args.isPython => +printErrorAndExit("Python applications are currently not supported for Kubernetes.") + case (KUBERNETES, _) if args.isR => +printErrorAndExit("R applications are currently not supported for Kubernetes.") --- End diff -- nit: Not affect the result, but logically I think it is better: ```scala case (KUBERNETES, _) if args.isPython => printErrorAndExit("Python applications are currently not supported for Kubernetes.") case (KUBERNETES, _) if args.isR => printErrorAndExit("R applications are currently not supported for Kubernetes.") case (KUBERNETES, CLIENT) => printErrorAndExit("Client mode is currently not supported for Kubernetes.") ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154549733 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +604,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the +| application must be launched. The namespace must already +| exist in the cluster. (Default: default). --- End diff -- There are some messages needed to be updated too, e.g,: | Spark standalone or Mesos with cluster deploy mode only: | --supervise If given, restarts the driver on failure. | --kill SUBMISSION_IDIf given, kills the driver specified. | --status SUBMISSION_ID If given, requests the status of the driver specified. From above, k8s supports killing submission and requesting submission statuses. | Spark standalone and Mesos only: | --total-executor-cores NUM Total cores for all executors. k8s also supports `totalExecutorCores` option. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154553669 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/executor/Dockerfile --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-executor:latest -f dockerfiles/executor/Dockerfile . + +COPY examples /opt/spark/examples + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ +env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ +readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \ +if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp "$SPARK_CLASSPATH" org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP --- End diff -- For `SPARK_EXECUTOR_PORT`, I don't see the corresponding `ENV_EXECUTOR_PORT` has been set in `createExecutorPod`. Is it missing? ditto for`SPARK_MOUNTED_CLASSPATH`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154537698 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -304,7 +313,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } private def validateStatusRequestArguments(): Unit = { -if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { +if (!master.startsWith("spark://") + && !master.startsWith("mesos://") + && !master.startsWith("k8s://")) { SparkSubmit.printErrorAndExit( "Requesting submission statuses is only supported in standalone or Mesos mode!") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154537691 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -294,7 +301,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } private def validateKillArguments(): Unit = { -if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { +if (!master.startsWith("spark://") + && !master.startsWith("mesos://") + && !master.startsWith("k8s://")) { SparkSubmit.printErrorAndExit( "Killing submissions is only supported in standalone or Mesos mode!") --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154537632 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +private[spark] class DependencyResolutionStepSuite extends SparkFunSuite { --- End diff -- Done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154537561 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") --- End diff -- This is somehow implementation details and we expect normally users wouldn't need to set or even know about it, so having a default makes sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154502824 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile --- @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine --- End diff -- I tracked down the parentage of this dependency tree and their licenses, openjdk: MIT alpine: BSD2 so we are ok. might need to list them in NOTICE though --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503544 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile --- @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile . --- End diff -- there is one more step before this, I think, it's to build spark first. could you add a link to http://spark.apache.org/docs/latest/building-spark.html --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503161 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503411 --- Diff: resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DependencyResolutionStepSuite.scala --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder} + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +private[spark] class DependencyResolutionStepSuite extends SparkFunSuite { --- End diff -- don't `private[spark]` could you double check jenkins' run --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503100 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.util.concurrent.TimeUnit + +import org.apache.spark.{SPARK_VERSION => sparkVersion} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.ConfigBuilder +import org.apache.spark.network.util.ByteUnit + +private[spark] object Config extends Logging { + + val KUBERNETES_NAMESPACE = +ConfigBuilder("spark.kubernetes.namespace") + .doc("The namespace that will be used for running the driver and executor pods. When using " + +"spark-submit in cluster mode, this can also be passed to spark-submit via the " + +"--kubernetes-namespace command line argument.") + .stringConf + .createWithDefault("default") + + val DRIVER_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.driver.docker.image") + .doc("Docker image to use for the driver. Specify this using the standard Docker tag format.") + .stringConf + .createWithDefault(s"spark-driver:$sparkVersion") + + val EXECUTOR_DOCKER_IMAGE = +ConfigBuilder("spark.kubernetes.executor.docker.image") + .doc("Docker image to use for the executors. Specify this using the standard Docker tag " + +"format.") + .stringConf + .createWithDefault(s"spark-executor:$sparkVersion") + + val DOCKER_IMAGE_PULL_POLICY = +ConfigBuilder("spark.kubernetes.docker.image.pullPolicy") + .doc("Docker image pull policy when pulling any docker image in Kubernetes integration") + .stringConf + .createWithDefault("IfNotPresent") + + + val KUBERNETES_AUTH_DRIVER_CONF_PREFIX = + "spark.kubernetes.authenticate.driver" + val KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX = + "spark.kubernetes.authenticate.driver.mounted" + val OAUTH_TOKEN_CONF_SUFFIX = "oauthToken" + val OAUTH_TOKEN_FILE_CONF_SUFFIX = "oauthTokenFile" + val CLIENT_KEY_FILE_CONF_SUFFIX = "clientKeyFile" + val CLIENT_CERT_FILE_CONF_SUFFIX = "clientCertFile" + val CA_CERT_FILE_CONF_SUFFIX = "caCertFile" + + val KUBERNETES_SERVICE_ACCOUNT_NAME = + ConfigBuilder(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.serviceAccountName") + .doc("Service account that is used when running the driver pod. The driver pod uses " + +"this service account when requesting executor pods from the API server. If specific " + +"credentials are given for the driver pod to use, the driver will favor " + +"using those credentials instead.") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.driver.limit.cores") + .doc("Specify the hard cpu limit for the driver pod") + .stringConf + .createOptional + + val KUBERNETES_EXECUTOR_LIMIT_CORES = +ConfigBuilder("spark.kubernetes.executor.limit.cores") + .doc("Specify the hard cpu limit for a single executor pod") + .stringConf + .createOptional + + val KUBERNETES_DRIVER_MEMORY_OVERHEAD = +ConfigBuilder("spark.kubernetes.driver.memoryOverhead") + .doc("The amount of off-heap memory (in megabytes) to be allocated for the driver and the " + +"driver submission server. This is memory that accounts for things like VM overheads, " + +"interned strings, other native overheads, etc. This tends to grow with the driver's " + +"memory size (typically 6-10%).") + .bytesConf(ByteUnit.MiB) + .createOptional + + // Note that while we set a default for this when we start up the + // scheduler, the specific default value is
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503612 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/entrypoint.sh --- @@ -0,0 +1,37 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# echo commands to the terminal output +set -x --- End diff -- consider `set -ex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503222 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( +submissionSteps: Seq[DriverConfigurationStep], +submissionSparkConf: SparkConf, +kubernetesClient: KubernetesClient, +waitForAppCompletion: Boolean, +appName: String, +loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** +* Run command that initializes a DriverSpec that will be updated after each +* DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec +* will be used to build the Driver Container, Driver Pod, and Kubernetes
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154502636 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -304,7 +313,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } private def validateStatusRequestArguments(): Unit = { -if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { +if (!master.startsWith("spark://") + && !master.startsWith("mesos://") + && !master.startsWith("k8s://")) { SparkSubmit.printErrorAndExit( "Requesting submission statuses is only supported in standalone or Mesos mode!") --- End diff -- add `Kubernetes` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154502901 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -702,6 +715,19 @@ object SparkSubmit extends CommandLineUtils with Logging { } } +if (isKubernetesCluster) { + childMainClass = "org.apache.spark.deploy.k8s.submit.Client" --- End diff -- let's wait for that and rebase this PR --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503509 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/driver/Dockerfile --- @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM spark-base + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-driver:latest -f dockerfiles/driver/Dockerfile . + +COPY examples /opt/spark/examples + +CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ +env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ +readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \ +if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ +${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp "$SPARK_CLASSPATH" -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $SPARK_DRIVER_ARGS --- End diff -- what are the core differences in driver vs executor image, other than the `SPARK_DRIVER*` env? we could consider switching in a docker-entrypoint.sh script like in https://github.com/apache/flink/blob/master/flink-contrib/docker-flink/docker-entrypoint.sh and have one image instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503142 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") + + private[spark] val FILES_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir") + .doc("Location to download files to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pods.") + .stringConf + .createWithDefault("/var/spark-data/spark-files") + + val KUBERNETES_AUTH_SUBMISSION_CONF_PREFIX = +"spark.kubernetes.authenticate.submission" + val KUBERNETES_NODE_SELECTOR_PREFIX = "spark.kubernetes.node.selector." + + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." + val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." + + val KUBERNETES_EXECUTOR_LABEL_PREFIX = "spark.kubernetes.executor.label." + val KUBERNETES_EXECUTOR_ANNOTATION_PREFIX = "spark.kubernetes.executor.annotation." + + val KUBERNETES_DRIVER_ENV_KEY = "spark.kubernetes.driverEnv." + + def getK8sMasterUrl(rawMasterString: String): String = { +require(rawMasterString.startsWith("k8s://"), + "Master URL should start with k8s:// in Kubernetes mode.") +val masterWithoutK8sPrefix = rawMasterString.substring("k8s://".length) +if (masterWithoutK8sPrefix.startsWith("http://;) --- End diff -- btw, on a side note, should there be a log/warning for http: (vs https:)? I don't think we have that case for other cluster manager --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503317 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala --- @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. + */ +private[k8s] class LoggingPodStatusWatcherImpl( +appId: String, +maybeLoggingInterval: Option[Long]) + extends LoggingPodStatusWatcher with Logging { + + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { +override def run() = logShortStatus() + } + + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + + def start(): Unit = { +maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") + scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) +} + } + + override def eventReceived(action: Action, pod: Pod): Unit = { +this.pod = Option(pod) +action match { + case Action.DELETED | Action.ERROR => +closeWatch() + + case _ => +logLongStatus() +if (hasCompleted()) { + closeWatch() +} +} + } + + override def onClose(e: KubernetesClientException): Unit = { +logDebug(s"Stopping watching application $appId with last-observed phase $phase") +closeWatch() + } + + private def logShortStatus() = { +logInfo(s"Application status for $appId (phase: $phase)") + } + + private def logLongStatus() = { +logInfo("State changed, new state: " + pod.map(formatPodState).getOrElse("unknown")) + } + + private def hasCompleted(): Boolean = { +phase == "Succeeded" || phase == "Failed" + } + + private def closeWatch(): Unit = { +podCompletedFuture.countDown() +scheduler.shutdown() + } + + private def formatPodState(pod: Pod): String = { +// TODO include specific container state --- End diff -- should this be tracked with a JIRA? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154502635 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -294,7 +301,9 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } private def validateKillArguments(): Unit = { -if (!master.startsWith("spark://") && !master.startsWith("mesos://")) { +if (!master.startsWith("spark://") + && !master.startsWith("mesos://") + && !master.startsWith("k8s://")) { SparkSubmit.printErrorAndExit( "Killing submissions is only supported in standalone or Mesos mode!") --- End diff -- add `Kubernetes` to the msg --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503012 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -590,6 +600,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | the node running the Application Master via the Secure | Distributed Cache, for renewing the login tickets and the | delegation tokens periodically. +| +| Kubernetes only: +| --kubernetes-namespace NS The namespace in the Kubernetes cluster within which the +| application must be launched. The namespace must already +| exist in the cluster. (Default: default). --- End diff -- now that docker image is without a default value (`spark.kubernetes.*.docker.image`), (yes that discussion is ongoing) I wonder if it makes sense to bubble that up as a `--param` for visibility/convenient. tbh I'm generally against adding `--param` to submit because of the potential confusion it can cause, but since we are here and there's a `--kubernetes-namespace` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503085 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") --- End diff -- does it make sense to have a default? perhaps a comment or doc string to explain why this value as default? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503638 --- Diff: resource-managers/kubernetes/docker/src/main/dockerfiles/spark-base/Dockerfile --- @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FROM openjdk:8-alpine + +# If this docker file is being used in the context of building your images from a Spark distribution, the docker build +# command should be invoked from the top level directory of the Spark distribution. E.g.: +# docker build -t spark-base:latest -f dockerfiles/spark-base/Dockerfile . + +RUN apk upgrade --no-cache && \ --- End diff -- consider - add `set -ex` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154503089 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala --- @@ -119,5 +139,60 @@ private[spark] object Config extends Logging { "must be a positive integer") .createWithDefault(10) + val WAIT_FOR_APP_COMPLETION = +ConfigBuilder("spark.kubernetes.submission.waitAppCompletion") + .doc("In cluster mode, whether to wait for the application to finish before exiting the " + +"launcher process.") + .booleanConf + .createWithDefault(true) + + val REPORT_INTERVAL = +ConfigBuilder("spark.kubernetes.report.interval") + .doc("Interval between reports of the current app status in cluster mode.") + .timeConf(TimeUnit.MILLISECONDS) + .createWithDefaultString("1s") + + private[spark] val JARS_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.jarsDownloadDir") + .doc("Location to download jars to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pod.") + .stringConf + .createWithDefault("/var/spark-data/spark-jars") + + private[spark] val FILES_DOWNLOAD_LOCATION = +ConfigBuilder("spark.kubernetes.mountDependencies.filesDownloadDir") + .doc("Location to download files to in the driver and executors. When using" + +" spark-submit, this directory must be empty and will be mounted as an empty directory" + +" volume on the driver and executor pods.") + .stringConf + .createWithDefault("/var/spark-data/spark-files") --- End diff -- ditto --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154168292 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/BaseDriverConfigurationStep.scala --- @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, EnvVarSourceBuilder, PodBuilder, QuantityBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.ConfigurationUtils +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Represents the initial setup required for the driver. + */ +private[spark] class BaseDriverConfigurationStep( +kubernetesAppId: String, +kubernetesResourceNamePrefix: String, +driverLabels: Map[String, String], +dockerImagePullPolicy: String, +appName: String, +mainClass: String, +appArgs: Array[String], +submissionSparkConf: SparkConf) extends DriverConfigurationStep { + + private val kubernetesDriverPodName = submissionSparkConf.get(KUBERNETES_DRIVER_POD_NAME) +.getOrElse(s"$kubernetesResourceNamePrefix-driver") + + private val driverExtraClasspath = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_CLASS_PATH) + + private val driverDockerImage = submissionSparkConf.get(DRIVER_DOCKER_IMAGE) + + // CPU settings + private val driverCpuCores = submissionSparkConf.getOption("spark.driver.cores").getOrElse("1") + private val driverLimitCores = submissionSparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES) + + // Memory settings + private val driverMemoryMiB = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY) + private val driverMemoryString = submissionSparkConf.get( +org.apache.spark.internal.config.DRIVER_MEMORY.key, +org.apache.spark.internal.config.DRIVER_MEMORY.defaultValueString) + private val memoryOverheadMiB = submissionSparkConf +.get(KUBERNETES_DRIVER_MEMORY_OVERHEAD) +.getOrElse(math.max((MEMORY_OVERHEAD_FACTOR * driverMemoryMiB).toInt, + MEMORY_OVERHEAD_MIN_MIB)) + private val driverContainerMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB + + override def configureDriver( + driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => + new EnvVarBuilder() +.withName(ENV_SUBMIT_EXTRA_CLASSPATH) +.withValue(classPath) +.build() +} + +val driverCustomAnnotations = ConfigurationUtils + .parsePrefixedKeyValuePairs( +submissionSparkConf, +KUBERNETES_DRIVER_ANNOTATION_PREFIX) +require(!driverCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), + s"Annotation with key $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for" + +" Spark bookkeeping operations.") + +val driverCustomEnvs = submissionSparkConf.getAllWithPrefix(KUBERNETES_DRIVER_ENV_KEY).toSeq + .map(env => new EnvVarBuilder() +.withName(env._1) +.withValue(env._2) +.build()) + +val allDriverAnnotations = driverCustomAnnotations ++ Map(SPARK_APP_NAME_ANNOTATION -> appName) + +val nodeSelector = ConfigurationUtils.parsePrefixedKeyValuePairs( + submissionSparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) + +val driverCpuQuantity = new QuantityBuilder(false) + .withAmount(driverCpuCores) + .build() +val driverMemoryQuantity = new QuantityBuilder(false) + .withAmount(s"${driverMemoryMiB}Mi") + .build() +val
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154158221 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: MainAppResource, + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource.get, --- End diff -- Also added the missing step `DependencyResolutionStep` that is essential for supporting `local://` dependencies (jars and files). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154153331 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/LoggingPodStatusWatcher.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.concurrent.{CountDownLatch, TimeUnit} + +import scala.collection.JavaConverters._ + +import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time} +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action + +import org.apache.spark.SparkException +import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils + +private[k8s] trait LoggingPodStatusWatcher extends Watcher[Pod] { + def awaitCompletion(): Unit +} + +/** + * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on + * every state change and also at an interval for liveness. + * + * @param appId application ID. + * @param maybeLoggingInterval ms between each state request. If provided, must be a positive + * number. + */ +private[k8s] class LoggingPodStatusWatcherImpl( + appId: String, maybeLoggingInterval: Option[Long]) +extends LoggingPodStatusWatcher with Logging { + + private val podCompletedFuture = new CountDownLatch(1) + // start timer for periodic logging + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val logRunnable: Runnable = new Runnable { +override def run() = logShortStatus() + } + + private var pod = Option.empty[Pod] + + private def phase: String = pod.map(_.getStatus.getPhase).getOrElse("unknown") + + def start(): Unit = { +maybeLoggingInterval.foreach { interval => + require(interval > 0, s"Logging interval must be a positive time value, got: $interval ms.") + scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) +} + } + + override def eventReceived(action: Action, pod: Pod): Unit = { +this.pod = Option(pod) +action match { + case Action.DELETED => +closeWatch() + + case Action.ERROR => +closeWatch() --- End diff -- It's because in both `DELETE` and `ERROR` cases, the application is considered being terminated. In other cases, we need to check the phase of the driver pod to determine if the application terminated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154143906 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: MainAppResource, + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource.get, --- End diff -- As @mridulm pointed above, `mainAppResource` may be empty in cases of spark examples, Thrift server, etc. `--primary-java-resource` is set only if `mainAppResource != SparkLauncher.NO_RESOURCE` in `SparkSubmit`. So it could be empty here. I changed `ClientArguments` to take an `Option[MainAppResource]` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154134373 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154134483 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user liyinan926 commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154134088 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesClientFactory.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils +import okhttp3.Dispatcher + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.util.ThreadUtils + +/** + * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to + * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL + * options for different components. + */ +private[spark] object KubernetesClientFactory { --- End diff -- Ah, it's duplicated. Removed this class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154016042 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesClientFactory.scala --- @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient} +import io.fabric8.kubernetes.client.utils.HttpClientUtils +import okhttp3.Dispatcher + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.util.ThreadUtils + +/** + * Spark-opinionated builder for Kubernetes clients. It uses a prefix plus common suffixes to + * parse configuration keys, similar to the manner in which Spark's SecurityManager parses SSL + * options for different components. + */ +private[spark] object KubernetesClientFactory { --- End diff -- What's the difference between this and `SparkKubernetesClientFactory`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154029412 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154029577 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverKubernetesCredentialsStep.scala --- @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit.steps + +import java.io.File +import java.nio.charset.StandardCharsets + +import scala.collection.JavaConverters._ +import scala.language.implicitConversions + +import com.google.common.io.{BaseEncoding, Files} +import io.fabric8.kubernetes.api.model.{ContainerBuilder, PodBuilder, Secret, SecretBuilder} + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec + +/** + * Mounts Kubernetes credentials into the driver pod. The driver will use such mounted credentials + * to request executors. + */ +private[spark] class DriverKubernetesCredentialsStep( +submissionSparkConf: SparkConf, +kubernetesResourceNamePrefix: String) extends DriverConfigurationStep { + + private val maybeMountedOAuthTokenFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX") + private val maybeMountedClientKeyFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX") + private val maybeMountedClientCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX") + private val maybeMountedCaCertFile = submissionSparkConf.getOption( + s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX") + private val driverServiceAccount = submissionSparkConf.get(KUBERNETES_SERVICE_ACCOUNT_NAME) + + override def configureDriver(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = { +val driverSparkConf = driverSpec.driverSparkConf.clone() + +val oauthTokenBase64 = submissionSparkConf + .getOption(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX") +.map { token => + BaseEncoding.base64().encode(token.getBytes(StandardCharsets.UTF_8)) +} +val caCertDataBase64 = safeFileConfToBase64( +s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", +"Driver CA cert file provided at %s does not exist or is not a file.") +val clientKeyDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", +"Driver client key file provided at %s does not exist or is not a file.") +val clientCertDataBase64 = safeFileConfToBase64( + s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", +"Driver client cert file provided at %s does not exist or is not a file.") + +val driverSparkConfWithCredentialsLocations = setDriverPodKubernetesCredentialLocations( +driverSparkConf, +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val kubernetesCredentialsSecret = createCredentialsSecret( +oauthTokenBase64, +caCertDataBase64, +clientKeyDataBase64, +clientCertDataBase64) + +val driverPodWithMountedKubernetesCredentials = kubernetesCredentialsSecret.map { secret => + new PodBuilder(driverSpec.driverPod) +.editOrNewSpec() + .addNewVolume() +.withName(DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + .withNewSecret().withSecretName(secret.getMetadata.getName).endSecret() +.endVolume() + .endSpec() +.build() +}.getOrElse( +
[GitHub] spark pull request #19717: [SPARK-22646] [Submission] Spark on Kubernetes - ...
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154022151 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.util.control.NonFatal + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.KubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: MainAppResource, + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { +var mainAppResource: Option[MainAppResource] = None +var mainClass: Option[String] = None +val driverArgs = mutable.ArrayBuffer.empty[String] + +args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => +mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => +mainClass = Some(clazz) + case Array("--arg", arg: String) => +driverArgs += arg + case other => +val invalid = other.mkString(" ") +throw new RuntimeException(s"Unknown arguments: $invalid") +} + +require(mainClass.isDefined, "Main class must be specified via --main-class") + +ClientArguments( + mainAppResource.get, --- End diff -- Don't we need to check if `mainAppResource` is defined or not before here? Otherwise some exception will be thrown if it is not defined. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org