Github user ssuchter commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20697#discussion_r194150705
  
    --- Diff: 
resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 ---
    @@ -0,0 +1,294 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.deploy.k8s.integrationtest
    +
    +import java.io.File
    +import java.nio.file.{Path, Paths}
    +import java.util.UUID
    +import java.util.regex.Pattern
    +
    +import scala.collection.JavaConverters._
    +
    +import com.google.common.io.PatternFilenameFilter
    +import io.fabric8.kubernetes.api.model.{Container, Pod}
    +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
    +import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
    +import org.scalatest.time.{Minutes, Seconds, Span}
    +
    +import org.apache.spark.SparkFunSuite
    +import 
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, 
IntegrationTestBackendFactory}
    +import org.apache.spark.deploy.k8s.integrationtest.config._
    +
    +private[spark] class KubernetesSuite extends SparkFunSuite
    +  with BeforeAndAfterAll with BeforeAndAfter {
    +
    +  import KubernetesSuite._
    +
    +  private var testBackend: IntegrationTestBackend = _
    +  private var sparkHomeDir: Path = _
    +  private var kubernetesTestComponents: KubernetesTestComponents = _
    +  private var sparkAppConf: SparkAppConf = _
    +  private var image: String = _
    +  private var containerLocalSparkDistroExamplesJar: String = _
    +  private var appLocator: String = _
    +  private var driverPodName: String = _
    +
    +  override def beforeAll(): Unit = {
    +    // The scalatest-maven-plugin gives system properties that are 
referenced but not set null
    +    // values. We need to remove the null-value properties before 
initializing the test backend.
    +    val nullValueProperties = System.getProperties.asScala
    +      .filter(entry => entry._2.equals("null"))
    +      .map(entry => entry._1.toString)
    +    nullValueProperties.foreach { key =>
    +      System.clearProperty(key)
    +    }
    +
    +    val sparkDirProp = 
System.getProperty("spark.kubernetes.test.unpackSparkDir")
    +    require(sparkDirProp != null, "Spark home directory must be provided 
in system properties.")
    +    sparkHomeDir = Paths.get(sparkDirProp)
    +    require(sparkHomeDir.toFile.isDirectory,
    +      s"No directory found for spark home specified at $sparkHomeDir.")
    +    val imageTag = getTestImageTag
    +    val imageRepo = getTestImageRepo
    +    image = s"$imageRepo/spark:$imageTag"
    +
    +    val sparkDistroExamplesJarFile: File = 
sparkHomeDir.resolve(Paths.get("examples", "jars"))
    +      .toFile
    +      .listFiles(new 
PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0)
    +    containerLocalSparkDistroExamplesJar = 
s"local:///opt/spark/examples/jars/" +
    +      s"${sparkDistroExamplesJarFile.getName}"
    +    testBackend = IntegrationTestBackendFactory.getTestBackend
    +    testBackend.initialize()
    +    kubernetesTestComponents = new 
KubernetesTestComponents(testBackend.getKubernetesClient)
    +  }
    +
    +  override def afterAll(): Unit = {
    +    testBackend.cleanUp()
    +  }
    +
    +  before {
    +    appLocator = UUID.randomUUID().toString.replaceAll("-", "")
    +    driverPodName = "spark-test-app-" + 
UUID.randomUUID().toString.replaceAll("-", "")
    +    sparkAppConf = kubernetesTestComponents.newSparkAppConf()
    +      .set("spark.kubernetes.container.image", image)
    +      .set("spark.kubernetes.driver.pod.name", driverPodName)
    +      .set("spark.kubernetes.driver.label.spark-app-locator", appLocator)
    +      .set("spark.kubernetes.executor.label.spark-app-locator", appLocator)
    +    if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
    +      kubernetesTestComponents.createNamespace()
    +    }
    +  }
    +
    +  after {
    +    if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
    +      kubernetesTestComponents.deleteNamespace()
    +    }
    +    deleteDriverPod()
    +  }
    +
    +  test("Run SparkPi with no resources") {
    +    runSparkPiAndVerifyCompletion()
    +  }
    +
    +  test("Run SparkPi with a very long application name.") {
    +    sparkAppConf.set("spark.app.name", "long" * 40)
    +    runSparkPiAndVerifyCompletion()
    +  }
    +
    +  test("Run SparkPi with a master URL without a scheme.") {
    +    val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
    +    val k8sMasterUrl = if (url.getPort < 0) {
    +      s"k8s://${url.getHost}"
    +    } else {
    +      s"k8s://${url.getHost}:${url.getPort}"
    +    }
    +    sparkAppConf.set("spark.master", k8sMasterUrl)
    +    runSparkPiAndVerifyCompletion()
    +  }
    +
    +  test("Run SparkPi with an argument.") {
    +    runSparkPiAndVerifyCompletion(appArgs = Array("5"))
    +  }
    +
    +  test("Run SparkPi with custom labels, annotations, and environment 
variables.") {
    +    sparkAppConf
    +      .set("spark.kubernetes.driver.label.label1", "label1-value")
    +      .set("spark.kubernetes.driver.label.label2", "label2-value")
    +      .set("spark.kubernetes.driver.annotation.annotation1", 
"annotation1-value")
    +      .set("spark.kubernetes.driver.annotation.annotation2", 
"annotation2-value")
    +      .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
    +      .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
    +      .set("spark.kubernetes.executor.label.label1", "label1-value")
    +      .set("spark.kubernetes.executor.label.label2", "label2-value")
    +      .set("spark.kubernetes.executor.annotation.annotation1", 
"annotation1-value")
    +      .set("spark.kubernetes.executor.annotation.annotation2", 
"annotation2-value")
    +      .set("spark.executorEnv.ENV1", "VALUE1")
    +      .set("spark.executorEnv.ENV2", "VALUE2")
    +
    +    runSparkPiAndVerifyCompletion(
    +      driverPodChecker = (driverPod: Pod) => {
    +        doBasicDriverPodCheck(driverPod)
    +        checkCustomSettings(driverPod)
    +      },
    +      executorPodChecker = (executorPod: Pod) => {
    +        doBasicExecutorPodCheck(executorPod)
    +        checkCustomSettings(executorPod)
    +      })
    +  }
    +
    +  // TODO(ssuchter): Enable the below after debugging
    +  // test("Run PageRank using remote data file") {
    +  //   sparkAppConf
    +  //     .set("spark.kubernetes.mountDependencies.filesDownloadDir",
    +  //       CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
    +  //     .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
    +  //   runSparkPageRankAndVerifyCompletion(
    +  //     appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
    +  // }
    +
    +  private def runSparkPiAndVerifyCompletion(
    +      appResource: String = containerLocalSparkDistroExamplesJar,
    +      driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
    +      executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
    +      appArgs: Array[String] = Array.empty[String],
    +      appLocator: String = appLocator): Unit = {
    +    runSparkApplicationAndVerifyCompletion(
    +      appResource,
    +      SPARK_PI_MAIN_CLASS,
    +      Seq("Pi is roughly 3"),
    +      appArgs,
    +      driverPodChecker,
    +      executorPodChecker,
    +      appLocator)
    +  }
    +
    +  private def runSparkPageRankAndVerifyCompletion(
    +      appResource: String = containerLocalSparkDistroExamplesJar,
    +      driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
    +      executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
    +      appArgs: Array[String],
    +      appLocator: String = appLocator): Unit = {
    +    runSparkApplicationAndVerifyCompletion(
    +      appResource,
    +      SPARK_PAGE_RANK_MAIN_CLASS,
    +      Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
    +      appArgs,
    +      driverPodChecker,
    +      executorPodChecker,
    +      appLocator)
    +  }
    +
    +  private def runSparkApplicationAndVerifyCompletion(
    +      appResource: String,
    +      mainClass: String,
    +      expectedLogOnCompletion: Seq[String],
    +      appArgs: Array[String],
    +      driverPodChecker: Pod => Unit,
    +      executorPodChecker: Pod => Unit,
    +      appLocator: String): Unit = {
    +    val appArguments = SparkAppArguments(
    +      mainAppResource = appResource,
    +      mainClass = mainClass,
    +      appArgs = appArgs)
    +    SparkAppLauncher.launch(appArguments, sparkAppConf, 
TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
    +
    +    val driverPod = kubernetesTestComponents.kubernetesClient
    +      .pods()
    +      .withLabel("spark-app-locator", appLocator)
    +      .withLabel("spark-role", "driver")
    +      .list()
    +      .getItems
    +      .get(0)
    +    driverPodChecker(driverPod)
    +
    +    val executorPods = kubernetesTestComponents.kubernetesClient
    +      .pods()
    +      .withLabel("spark-app-locator", appLocator)
    +      .withLabel("spark-role", "executor")
    +      .list()
    +      .getItems
    +    executorPods.asScala.foreach { pod =>
    +      executorPodChecker(pod)
    +    }
    +
    +    Eventually.eventually(TIMEOUT, INTERVAL) {
    +      expectedLogOnCompletion.foreach { e =>
    +        assert(kubernetesTestComponents.kubernetesClient
    +          .pods()
    +          .withName(driverPod.getMetadata.getName)
    +          .getLog
    +          .contains(e), "The application did not complete.")
    +      }
    +    }
    +  }
    +
    +  private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
    +    assert(driverPod.getMetadata.getName === driverPodName)
    +    assert(driverPod.getSpec.getContainers.get(0).getImage === image)
    +    assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
    +  }
    +
    +  private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
    +    assert(executorPod.getSpec.getContainers.get(0).getImage === image)
    +    assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
    +  }
    +
    +  private def checkCustomSettings(pod: Pod): Unit = {
    +    assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
    +    assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
    +    assert(pod.getMetadata.getAnnotations.get("annotation1") === 
"annotation1-value")
    +    assert(pod.getMetadata.getAnnotations.get("annotation2") === 
"annotation2-value")
    +
    +    val container = pod.getSpec.getContainers.get(0)
    +    val envVars = container
    +      .getEnv
    +      .asScala
    +      .map { env =>
    +        (env.getName, env.getValue)
    +      }
    +      .toMap
    +    assert(envVars("ENV1") === "VALUE1")
    +    assert(envVars("ENV2") === "VALUE2")
    +  }
    +
    +  private def deleteDriverPod(): Unit = {
    +    
kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete()
    +    Eventually.eventually(TIMEOUT, INTERVAL) {
    +      assert(kubernetesTestComponents.kubernetesClient
    +        .pods()
    +        .withName(driverPodName)
    +        .get() == null)
    +    }
    +  }
    +}
    +
    +private[spark] object KubernetesSuite {
    +
    +  val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
    +  val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
    +  val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
    +  val SPARK_PAGE_RANK_MAIN_CLASS: String = 
"org.apache.spark.examples.SparkPageRank"
    +
    +  // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
    +
    +  // val REMOTE_PAGE_RANK_DATA_FILE =
    +  //   
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt";
    +  // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
    +  //   s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
    +
    +  case object ShuffleNotReadyException extends Exception
    --- End diff --
    
    I commented this out because it's currently unused but will be used soon.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to