Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r155039435 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { + var mainAppResource: Option[MainAppResource] = None + var mainClass: Option[String] = None + val driverArgs = mutable.ArrayBuffer.empty[String] + + args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => + mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => + mainClass = Some(clazz) + case Array("--arg", arg: String) => + driverArgs += arg + case other => + val invalid = other.mkString(" ") + throw new RuntimeException(s"Unknown arguments: $invalid") + } + + require(mainClass.isDefined, "Main class must be specified via --main-class") + + ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( + submissionSteps: Seq[DriverConfigurationStep], + submissionSparkConf: SparkConf, + kubernetesClient: KubernetesClient, + waitForAppCompletion: Boolean, + appName: String, + loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( + org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** + * Run command that initializes a DriverSpec that will be updated after each + * DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec + * will be used to build the Driver Container, Driver Pod, and Kubernetes Resources + */ + def run(): Unit = { + var currentDriverSpec = KubernetesDriverSpec.initialSpec(submissionSparkConf) + // submissionSteps contain steps necessary to take, to resolve varying + // client arguments that are passed in, created by orchestrator + for (nextStep <- submissionSteps) { + currentDriverSpec = nextStep.configureDriver(currentDriverSpec) + } + + val resolvedDriverJavaOpts = currentDriverSpec + .driverSparkConf + // Remove this as the options are instead extracted and set individually below using + // environment variables with prefix SPARK_JAVA_OPT_. + .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + .getAll + .map { + case (confKey, confValue) => s"-D$confKey=$confValue" + } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) + val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map { + case (option, index) => + new EnvVarBuilder() + .withName(s"$ENV_JAVA_OPT_PREFIX$index") + .withValue(option) + .build() + } + + val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer) + .addAllToEnv(driverJavaOptsEnvs.asJava) + .build() + val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod) + .editSpec() + .addToContainers(resolvedDriverContainer) + .endSpec() + .build() + + Utils.tryWithResource( + kubernetesClient + .pods() + .withName(resolvedDriverPod.getMetadata.getName) + .watch(loggingPodStatusWatcher)) { _ => + val createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod) + try { + if (currentDriverSpec.otherKubernetesResources.nonEmpty) { + val otherKubernetesResources = currentDriverSpec.otherKubernetesResources + addDriverOwnerReference(createdDriverPod, otherKubernetesResources) + kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace() + } + } catch { + case e: Throwable => --- End diff -- We don't want to be attempting to do anything on `OutOfMemmoryError` though for example. Perhaps we can register a shutdown hook that deletes the pod, and remove the shutdown hook after we successfully create everything. See `ShutdownHookManager`. But I think just catching `NonFatal` will deal with the majority use case.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org