Github user liyinan926 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19717#discussion_r155041880
  
    --- Diff: 
resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/Client.scala
 ---
    @@ -0,0 +1,234 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.k8s.submit
    +
    +import java.util.{Collections, UUID}
    +
    +import scala.collection.JavaConverters._
    +import scala.collection.mutable
    +
    +import io.fabric8.kubernetes.api.model._
    +import io.fabric8.kubernetes.client.KubernetesClient
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.deploy.SparkApplication
    +import org.apache.spark.deploy.k8s.Config._
    +import org.apache.spark.deploy.k8s.Constants._
    +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory
    +import org.apache.spark.deploy.k8s.submit.steps.DriverConfigurationStep
    +import org.apache.spark.internal.Logging
    +import org.apache.spark.util.Utils
    +
    +/**
    + * Encapsulates arguments to the submission client.
    + *
    + * @param mainAppResource the main application resource if any
    + * @param mainClass the main class of the application to run
    + * @param driverArgs arguments to the driver
    + */
    +private[spark] case class ClientArguments(
    +     mainAppResource: Option[MainAppResource],
    +     mainClass: String,
    +     driverArgs: Array[String])
    +
    +private[spark] object ClientArguments {
    +
    +  def fromCommandLineArgs(args: Array[String]): ClientArguments = {
    +    var mainAppResource: Option[MainAppResource] = None
    +    var mainClass: Option[String] = None
    +    val driverArgs = mutable.ArrayBuffer.empty[String]
    +
    +    args.sliding(2, 2).toList.foreach {
    +      case Array("--primary-java-resource", primaryJavaResource: String) =>
    +        mainAppResource = Some(JavaMainAppResource(primaryJavaResource))
    +      case Array("--main-class", clazz: String) =>
    +        mainClass = Some(clazz)
    +      case Array("--arg", arg: String) =>
    +        driverArgs += arg
    +      case other =>
    +        val invalid = other.mkString(" ")
    +        throw new RuntimeException(s"Unknown arguments: $invalid")
    +    }
    +
    +    require(mainClass.isDefined, "Main class must be specified via 
--main-class")
    +
    +    ClientArguments(
    +      mainAppResource,
    +      mainClass.get,
    +      driverArgs.toArray)
    +  }
    +}
    +
    +/**
    + * Submits a Spark application to run on Kubernetes by creating the driver 
pod and starting a
    + * watcher that monitors and logs the application status. Waits for the 
application to terminate if
    + * spark.kubernetes.submission.waitAppCompletion is true.
    + *
    + * @param submissionSteps steps that collectively configure the driver
    + * @param submissionSparkConf the submission client Spark configuration
    + * @param kubernetesClient the client to talk to the Kubernetes API server
    + * @param waitForAppCompletion a flag indicating whether the client should 
wait for the application
    + *                             to complete
    + * @param appName the application name
    + * @param loggingPodStatusWatcher a watcher that monitors and logs the 
application status
    + */
    +private[spark] class Client(
    +    submissionSteps: Seq[DriverConfigurationStep],
    +    submissionSparkConf: SparkConf,
    +    kubernetesClient: KubernetesClient,
    +    waitForAppCompletion: Boolean,
    +    appName: String,
    +    loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging {
    +
    +  private val driverJavaOptions = submissionSparkConf.get(
    +    org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
    +
    +   /**
    +    * Run command that initializes a DriverSpec that will be updated after 
each
    +    * DriverConfigurationStep in the sequence that is passed in. The final 
KubernetesDriverSpec
    +    * will be used to build the Driver Container, Driver Pod, and 
Kubernetes Resources
    +    */
    +  def run(): Unit = {
    +    var currentDriverSpec = 
KubernetesDriverSpec.initialSpec(submissionSparkConf)
    +    // submissionSteps contain steps necessary to take, to resolve varying
    +    // client arguments that are passed in, created by orchestrator
    +    for (nextStep <- submissionSteps) {
    +      currentDriverSpec = nextStep.configureDriver(currentDriverSpec)
    +    }
    +
    +    val resolvedDriverJavaOpts = currentDriverSpec
    +      .driverSparkConf
    +      // Remove this as the options are instead extracted and set 
individually below using
    +      // environment variables with prefix SPARK_JAVA_OPT_.
    +      .remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
    +      .getAll
    +      .map {
    +        case (confKey, confValue) => s"-D$confKey=$confValue"
    +      } ++ 
driverJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
    +    val driverJavaOptsEnvs: Seq[EnvVar] = 
resolvedDriverJavaOpts.zipWithIndex.map {
    +      case (option, index) =>
    +        new EnvVarBuilder()
    +          .withName(s"$ENV_JAVA_OPT_PREFIX$index")
    +          .withValue(option)
    +          .build()
    +    }
    +
    +    val resolvedDriverContainer = new 
ContainerBuilder(currentDriverSpec.driverContainer)
    +      .addAllToEnv(driverJavaOptsEnvs.asJava)
    +      .build()
    +    val resolvedDriverPod = new PodBuilder(currentDriverSpec.driverPod)
    +      .editSpec()
    +        .addToContainers(resolvedDriverContainer)
    +        .endSpec()
    +      .build()
    +
    +    Utils.tryWithResource(
    +        kubernetesClient
    +          .pods()
    +          .withName(resolvedDriverPod.getMetadata.getName)
    +          .watch(loggingPodStatusWatcher)) { _ =>
    --- End diff --
    
    Given the current state of this PR, I believe it's not far from being OK to 
merge. Breaking this up into multiple ones likely will not make the time to 
merge shorter, and possibly even make them harder to review because of lack of 
contexts in a single PR. IMO, I think we should keep this PR in its current 
form and dedicate the next one to integration tests. @felixcheung @foxish WDYT? 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to