Github user mccheah commented on a diff in the pull request: https://github.com/apache/spark/pull/19717#discussion_r154814332 --- Diff: resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.submit + +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.api.model._ +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkApplication +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants._ +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep +import org.apache.spark.internal.Logging +import org.apache.spark.util.Utils + +/** + * Encapsulates arguments to the submission client. + * + * @param mainAppResource the main application resource if any + * @param mainClass the main class of the application to run + * @param driverArgs arguments to the driver + */ +private[spark] case class ClientArguments( + mainAppResource: Option[MainAppResource], + mainClass: String, + driverArgs: Array[String]) + +private[spark] object ClientArguments { + + def fromCommandLineArgs(args: Array[String]): ClientArguments = { + var mainAppResource: Option[MainAppResource] = None + var mainClass: Option[String] = None + val driverArgs = mutable.ArrayBuffer.empty[String] + + args.sliding(2, 2).toList.foreach { + case Array("--primary-java-resource", primaryJavaResource: String) => + mainAppResource = Some(JavaMainAppResource(primaryJavaResource)) + case Array("--main-class", clazz: String) => + mainClass = Some(clazz) + case Array("--arg", arg: String) => + driverArgs += arg + case other => + val invalid = other.mkString(" ") + throw new RuntimeException(s"Unknown arguments: $invalid") + } + + require(mainClass.isDefined, "Main class must be specified via --main-class") + + ClientArguments( + mainAppResource, + mainClass.get, + driverArgs.toArray) + } +} + +/** + * Submits a Spark application to run on Kubernetes by creating the driver pod and starting a + * watcher that monitors and logs the application status. Waits for the application to terminate if + * spark.kubernetes.submission.waitAppCompletion is true. + * + * @param submissionSteps steps that collectively configure the driver + * @param submissionSparkConf the submission client Spark configuration + * @param kubernetesClient the client to talk to the Kubernetes API server + * @param waitForAppCompletion a flag indicating whether the client should wait for the application + * to complete + * @param appName the application name + * @param loggingPodStatusWatcher a watcher that monitors and logs the application status + */ +private[spark] class Client( + submissionSteps: Seq[DriverConfigurationStep], + submissionSparkConf: SparkConf, + kubernetesClient: KubernetesClient, + waitForAppCompletion: Boolean, + appName: String, + loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { + + private val driverJavaOptions = submissionSparkConf.get( + org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + + /** + * Run command that initializes a DriverSpec that will be updated after each + * DriverConfigurationStep in the sequence that is passed in. The final KubernetesDriverSpec + * will be used to build the Driver Container, Driver Pod, and Kubernetes Resources + */ + def run(): Unit = { + var currentDriverSpec = KubernetesDriverSpec.initialSpec(submissionSparkConf) + // submissionSteps contain steps necessary to take, to resolve varying + // client arguments that are passed in, created by orchestrator + for (nextStep <- submissionSteps) { + currentDriverSpec = nextStep.configureDriver(currentDriverSpec) + } + + val resolvedDriverJavaOpts = currentDriverSpec + .driverSparkConf + // Remove this as the options are instead extracted and set individually below using + // environment variables with prefix SPARK_JAVA_OPT_. + .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) + .getAll + .map { + case (confKey, confValue) => s"-D$confKey=$confValue" + } ++ driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty) + val driverJavaOptsEnvs: Seq[EnvVar] = resolvedDriverJavaOpts.zipWithIndex.map { + case (option, index) => + new EnvVarBuilder() + .withName(s"$ENV_JAVA_OPT_PREFIX$index") + .withValue(option) + .build() + } + + val resolvedDriverContainer = new ContainerBuilder(currentDriverSpec.driverContainer) + .addAllToEnv(driverJavaOptsEnvs.asJava) + .build() + val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod) + .editSpec() + .addToContainers(resolvedDriverContainer) + .endSpec() + .build() + + Utils.tryWithResource( + kubernetesClient + .pods() + .withName(resolvedDriverPod.getMetadata.getName) + .watch(loggingPodStatusWatcher)) { _ => --- End diff -- In the interest of reducing the complexity of this particular change, I think it's fine to remove the logging pod status watcher for now and introduce it in a follow up commit. We can make a simpler version here. For example, we can have a simpler watcher that checks the pod ends up in the running state before some timeout. We can also explicitly not support waiting for app completion. Basically I'm curious as to how simple we can make this to minimize the amount of new code we're introducing in the first push. The tradeoff of course is that we're expiicitly limiting functionality we introduce in 2.3.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org