Repository: spark Updated Branches: refs/heads/master b070ded28 -> f433ef786
[SPARK-23010][K8S] Initial checkin of k8s integration tests. These tests were developed in the https://github.com/apache-spark-on-k8s/spark-integration repo by several contributors. This is a copy of the current state into the main apache spark repo. The only changes from the current spark-integration repo state are: * Move the files from the repo root into resource-managers/kubernetes/integration-tests * Add a reference to these tests in the root README.md * Fix a path reference in dev/dev-run-integration-tests.sh * Add a TODO in include/util.sh ## What changes were proposed in this pull request? Incorporation of Kubernetes integration tests. ## How was this patch tested? This code has its own unit tests, but the main purpose is to provide the integration tests. I tested this on my laptop by running dev/dev-run-integration-tests.sh --spark-tgz ~/spark-2.4.0-SNAPSHOT-bin--.tgz The spark-integration tests have already been running for months in AMPLab, here is an example: https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-scheduled-spark-integration-master/ Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Sean Suchter <sean-git...@suchter.com> Author: Sean Suchter <ssuch...@pepperdata.com> Closes #20697 from ssuchter/ssuchter-k8s-integration-tests. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f433ef78 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f433ef78 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f433ef78 Branch: refs/heads/master Commit: f433ef786770e48e3594ad158ce9908f98ef0d9a Parents: b070ded Author: Sean Suchter <sean-git...@suchter.com> Authored: Fri Jun 8 15:15:24 2018 -0700 Committer: mcheah <mch...@palantir.com> Committed: Fri Jun 8 15:15:24 2018 -0700 ---------------------------------------------------------------------- README.md | 2 + dev/tox.ini | 2 +- pom.xml | 1 + .../kubernetes/integration-tests/README.md | 52 ++++ .../dev/dev-run-integration-tests.sh | 93 ++++++ .../integration-tests/dev/spark-rbac.yaml | 52 ++++ .../kubernetes/integration-tests/pom.xml | 155 ++++++++++ .../scripts/setup-integration-test-env.sh | 91 ++++++ .../src/test/resources/log4j.properties | 31 ++ .../k8s/integrationtest/KubernetesSuite.scala | 294 +++++++++++++++++++ .../KubernetesTestComponents.scala | 120 ++++++++ .../k8s/integrationtest/ProcessUtils.scala | 46 +++ .../integrationtest/SparkReadinessWatcher.scala | 41 +++ .../deploy/k8s/integrationtest/Utils.scala | 30 ++ .../backend/IntegrationTestBackend.scala | 43 +++ .../backend/minikube/Minikube.scala | 84 ++++++ .../backend/minikube/MinikubeTestBackend.scala | 42 +++ .../deploy/k8s/integrationtest/config.scala | 38 +++ .../deploy/k8s/integrationtest/constants.scala | 22 ++ 19 files changed, 1238 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/README.md ---------------------------------------------------------------------- diff --git a/README.md b/README.md index 1e521a7..531d330 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,8 @@ can be run using: Please see the guidance on how to [run tests for a module, or individual tests](http://spark.apache.org/developer-tools.html#individual-tests). +There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md + ## A Note About Hadoop Versions Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/dev/tox.ini ---------------------------------------------------------------------- diff --git a/dev/tox.ini b/dev/tox.ini index 583c1ea..28dad8f 100644 --- a/dev/tox.ini +++ b/dev/tox.ini @@ -16,4 +16,4 @@ [pycodestyle] ignore=E402,E731,E241,W503,E226,E722,E741,E305 max-line-length=100 -exclude=cloudpickle.py,heapq3.py,shared.py,python/docs/conf.py,work/*/*.py,python/.eggs/* +exclude=cloudpickle.py,heapq3.py,shared.py,python/docs/conf.py,work/*/*.py,python/.eggs/*,dist/* http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 883c096..23bbd3b 100644 --- a/pom.xml +++ b/pom.xml @@ -2705,6 +2705,7 @@ <id>kubernetes</id> <modules> <module>resource-managers/kubernetes/core</module> + <module>resource-managers/kubernetes/integration-tests</module> </modules> </profile> http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/README.md ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/README.md b/resource-managers/kubernetes/integration-tests/README.md new file mode 100644 index 0000000..b3863e6 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/README.md @@ -0,0 +1,52 @@ +--- +layout: global +title: Spark on Kubernetes Integration Tests +--- + +# Running the Kubernetes Integration Tests + +Note that the integration test framework is currently being heavily revised and +is subject to change. Note that currently the integration tests only run with Java 8. + +The simplest way to run the integration tests is to install and run Minikube, then run the following: + + dev/dev-run-integration-tests.sh + +The minimum tested version of Minikube is 0.23.0. The kube-dns addon must be enabled. Minikube should +run with a minimum of 3 CPUs and 4G of memory: + + minikube start --cpus 3 --memory 4096 + +You can download Minikube [here](https://github.com/kubernetes/minikube/releases). + +# Integration test customization + +Configuration of the integration test runtime is done through passing different arguments to the test script. The main useful options are outlined below. + +## Re-using Docker Images + +By default, the test framework will build new Docker images on every test execution. A unique image tag is generated, +and it is written to file at `target/imageTag.txt`. To reuse the images built in a previous run, or to use a Docker image tag +that you have built by other means already, pass the tag to the test script: + + dev/dev-run-integration-tests.sh --image-tag <tag> + +where if you still want to use images that were built before by the test framework: + + dev/dev-run-integration-tests.sh --image-tag $(cat target/imageTag.txt) + +## Spark Distribution Under Test + +The Spark code to test is handed to the integration test system via a tarball. Here is the option that is used to specify the tarball: + +* `--spark-tgz <path-to-tgz>` - set `<path-to-tgz>` to point to a tarball containing the Spark distribution to test. + +TODO: Don't require the packaging of the built Spark artifacts into this tarball, just read them out of the current tree. + +## Customizing the Namespace and Service Account + +* `--namespace <namespace>` - set `<namespace>` to the namespace in which the tests should be run. +* `--service-account <service account name>` - set `<service account name>` to the name of the Kubernetes service account to +use in the namespace specified by the `--namespace`. The service account is expected to have permissions to get, list, watch, +and create pods. For clusters with RBAC turned on, it's important that the right permissions are granted to the service account +in the namespace through an appropriate role and role binding. A reference RBAC configuration is provided in `dev/spark-rbac.yaml`. http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh new file mode 100755 index 0000000..ea893fa --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh @@ -0,0 +1,93 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +TEST_ROOT_DIR=$(git rev-parse --show-toplevel)/resource-managers/kubernetes/integration-tests + +cd "${TEST_ROOT_DIR}" + +DEPLOY_MODE="minikube" +IMAGE_REPO="docker.io/kubespark" +SPARK_TGZ="N/A" +IMAGE_TAG="N/A" +SPARK_MASTER= +NAMESPACE= +SERVICE_ACCOUNT= + +# Parse arguments +while (( "$#" )); do + case $1 in + --image-repo) + IMAGE_REPO="$2" + shift + ;; + --image-tag) + IMAGE_TAG="$2" + shift + ;; + --deploy-mode) + DEPLOY_MODE="$2" + shift + ;; + --spark-tgz) + SPARK_TGZ="$2" + shift + ;; + --spark-master) + SPARK_MASTER="$2" + shift + ;; + --namespace) + NAMESPACE="$2" + shift + ;; + --service-account) + SERVICE_ACCOUNT="$2" + shift + ;; + *) + break + ;; + esac + shift +done + +cd $TEST_ROOT_DIR + +properties=( + -Dspark.kubernetes.test.sparkTgz=$SPARK_TGZ \ + -Dspark.kubernetes.test.imageTag=$IMAGE_TAG \ + -Dspark.kubernetes.test.imageRepo=$IMAGE_REPO \ + -Dspark.kubernetes.test.deployMode=$DEPLOY_MODE +) + +if [ -n $NAMESPACE ]; +then + properties=( ${properties[@]} -Dspark.kubernetes.test.namespace=$NAMESPACE ) +fi + +if [ -n $SERVICE_ACCOUNT ]; +then + properties=( ${properties[@]} -Dspark.kubernetes.test.serviceAccountName=$SERVICE_ACCOUNT ) +fi + +if [ -n $SPARK_MASTER ]; +then + properties=( ${properties[@]} -Dspark.kubernetes.test.master=$SPARK_MASTER ) +fi + +../../../build/mvn integration-test ${properties[@]} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml new file mode 100644 index 0000000..a4c242f --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +apiVersion: v1 +kind: Namespace +metadata: + name: spark +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: spark-sa + namespace: spark +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRole +metadata: + name: spark-role +rules: +- apiGroups: + - "" + resources: + - "pods" + verbs: + - "*" +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: spark-role-binding +subjects: +- kind: ServiceAccount + name: spark-sa + namespace: spark +roleRef: + kind: ClusterRole + name: spark-role + apiGroup: rbac.authorization.k8s.io \ No newline at end of file http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/pom.xml ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml new file mode 100644 index 0000000..520bda8 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/pom.xml @@ -0,0 +1,155 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.spark</groupId> + <artifactId>spark-parent_2.11</artifactId> + <version>2.4.0-SNAPSHOT</version> + <relativePath>../../../pom.xml</relativePath> + </parent> + + <artifactId>spark-kubernetes-integration-tests_2.11</artifactId> + <groupId>spark-kubernetes-integration-tests</groupId> + <properties> + <download-maven-plugin.version>1.3.0</download-maven-plugin.version> + <exec-maven-plugin.version>1.4.0</exec-maven-plugin.version> + <extraScalaTestArgs></extraScalaTestArgs> + <kubernetes-client.version>3.0.0</kubernetes-client.version> + <scala-maven-plugin.version>3.2.2</scala-maven-plugin.version> + <scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version> + <sbt.project.name>kubernetes-integration-tests</sbt.project.name> + <spark.kubernetes.test.unpackSparkDir>${project.build.directory}/spark-dist-unpacked</spark.kubernetes.test.unpackSparkDir> + <spark.kubernetes.test.imageTag>N/A</spark.kubernetes.test.imageTag> + <spark.kubernetes.test.imageTagFile>${project.build.directory}/imageTag.txt</spark.kubernetes.test.imageTagFile> + <spark.kubernetes.test.deployMode>minikube</spark.kubernetes.test.deployMode> + <spark.kubernetes.test.imageRepo>docker.io/kubespark</spark.kubernetes.test.imageRepo> + <test.exclude.tags></test.exclude.tags> + </properties> + <packaging>jar</packaging> + <name>Spark Project Kubernetes Integration Tests</name> + + <dependencies> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-core_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.fabric8</groupId> + <artifactId>kubernetes-client</artifactId> + <version>${kubernetes-client.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>${exec-maven-plugin.version}</version> + <executions> + <execution> + <id>setup-integration-test-env</id> + <phase>pre-integration-test</phase> + <goals> + <goal>exec</goal> + </goals> + <configuration> + <executable>scripts/setup-integration-test-env.sh</executable> + <arguments> + <argument>--unpacked-spark-tgz</argument> + <argument>${spark.kubernetes.test.unpackSparkDir}</argument> + + <argument>--image-repo</argument> + <argument>${spark.kubernetes.test.imageRepo}</argument> + + <argument>--image-tag</argument> + <argument>${spark.kubernetes.test.imageTag}</argument> + + <argument>--image-tag-output-file</argument> + <argument>${spark.kubernetes.test.imageTagFile}</argument> + + <argument>--deploy-mode</argument> + <argument>${spark.kubernetes.test.deployMode}</argument> + + <argument>--spark-tgz</argument> + <argument>${spark.kubernetes.test.sparkTgz}</argument> + </arguments> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <!-- Triggers scalatest plugin in the integration-test phase instead of + the test phase. --> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <version>${scalatest-maven-plugin.version}</version> + <configuration> + <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory> + <junitxml>.</junitxml> + <filereports>SparkTestSuite.txt</filereports> + <argLine>-ea -Xmx3g -XX:ReservedCodeCacheSize=512m ${extraScalaTestArgs}</argLine> + <stderr/> + <systemProperties> + <log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration> + <java.awt.headless>true</java.awt.headless> + <spark.kubernetes.test.imageTagFile>${spark.kubernetes.test.imageTagFile}</spark.kubernetes.test.imageTagFile> + <spark.kubernetes.test.unpackSparkDir>${spark.kubernetes.test.unpackSparkDir}</spark.kubernetes.test.unpackSparkDir> + <spark.kubernetes.test.imageRepo>${spark.kubernetes.test.imageRepo}</spark.kubernetes.test.imageRepo> + <spark.kubernetes.test.deployMode>${spark.kubernetes.test.deployMode}</spark.kubernetes.test.deployMode> + <spark.kubernetes.test.master>${spark.kubernetes.test.master}</spark.kubernetes.test.master> + <spark.kubernetes.test.namespace>${spark.kubernetes.test.namespace}</spark.kubernetes.test.namespace> + <spark.kubernetes.test.serviceAccountName>${spark.kubernetes.test.serviceAccountName}</spark.kubernetes.test.serviceAccountName> + </systemProperties> + <tagsToExclude>${test.exclude.tags}</tagsToExclude> + </configuration> + <executions> + <execution> + <id>test</id> + <goals> + <goal>test</goal> + </goals> + <configuration> + <!-- The negative pattern below prevents integration tests such as + KubernetesSuite from running in the test phase. --> + <suffixes>(?<!Suite)</suffixes> + </configuration> + </execution> + <execution> + <id>integration-test</id> + <phase>integration-test</phase> + <goals> + <goal>test</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + + </build> + +</project> http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh new file mode 100755 index 0000000..ccfb8e7 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +TEST_ROOT_DIR=$(git rev-parse --show-toplevel) +UNPACKED_SPARK_TGZ="$TEST_ROOT_DIR/target/spark-dist-unpacked" +IMAGE_TAG_OUTPUT_FILE="$TEST_ROOT_DIR/target/image-tag.txt" +DEPLOY_MODE="minikube" +IMAGE_REPO="docker.io/kubespark" +IMAGE_TAG="N/A" +SPARK_TGZ="N/A" + +# Parse arguments +while (( "$#" )); do + case $1 in + --unpacked-spark-tgz) + UNPACKED_SPARK_TGZ="$2" + shift + ;; + --image-repo) + IMAGE_REPO="$2" + shift + ;; + --image-tag) + IMAGE_TAG="$2" + shift + ;; + --image-tag-output-file) + IMAGE_TAG_OUTPUT_FILE="$2" + shift + ;; + --deploy-mode) + DEPLOY_MODE="$2" + shift + ;; + --spark-tgz) + SPARK_TGZ="$2" + shift + ;; + *) + break + ;; + esac + shift +done + +if [[ $SPARK_TGZ == "N/A" ]]; +then + echo "Must specify a Spark tarball to build Docker images against with --spark-tgz." && exit 1; +fi + +rm -rf $UNPACKED_SPARK_TGZ +mkdir -p $UNPACKED_SPARK_TGZ +tar -xzvf $SPARK_TGZ --strip-components=1 -C $UNPACKED_SPARK_TGZ; + +if [[ $IMAGE_TAG == "N/A" ]]; +then + IMAGE_TAG=$(uuidgen); + cd $UNPACKED_SPARK_TGZ + if [[ $DEPLOY_MODE == cloud ]] ; + then + $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -r $IMAGE_REPO -t $IMAGE_TAG build + if [[ $IMAGE_REPO == gcr.io* ]] ; + then + gcloud docker -- push $IMAGE_REPO/spark:$IMAGE_TAG + else + $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -r $IMAGE_REPO -t $IMAGE_TAG push + fi + else + # -m option for minikube. + $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -m -r $IMAGE_REPO -t $IMAGE_TAG build + fi + cd - +fi + +rm -f $IMAGE_TAG_OUTPUT_FILE +echo -n $IMAGE_TAG > $IMAGE_TAG_OUTPUT_FILE http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties b/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties new file mode 100644 index 0000000..866126b --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties @@ -0,0 +1,31 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set everything to be logged to the file target/integration-tests.log +log4j.rootCategory=INFO, file +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.append=true +log4j.appender.file.file=target/integration-tests.log +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n + +# Ignore messages below warning level from a few verbose libraries. +log4j.logger.com.sun.jersey=WARN +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.eclipse.jetty=WARN +log4j.logger.org.mortbay=WARN +log4j.logger.org.spark_project.jetty=WARN http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala new file mode 100644 index 0000000..65c513c --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.File +import java.nio.file.{Path, Paths} +import java.util.UUID +import java.util.regex.Pattern + +import scala.collection.JavaConverters._ + +import com.google.common.io.PatternFilenameFilter +import io.fabric8.kubernetes.api.model.{Container, Pod} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.concurrent.{Eventually, PatienceConfiguration} +import org.scalatest.time.{Minutes, Seconds, Span} + +import org.apache.spark.SparkFunSuite +import org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, IntegrationTestBackendFactory} +import org.apache.spark.deploy.k8s.integrationtest.config._ + +private[spark] class KubernetesSuite extends SparkFunSuite + with BeforeAndAfterAll with BeforeAndAfter { + + import KubernetesSuite._ + + private var testBackend: IntegrationTestBackend = _ + private var sparkHomeDir: Path = _ + private var kubernetesTestComponents: KubernetesTestComponents = _ + private var sparkAppConf: SparkAppConf = _ + private var image: String = _ + private var containerLocalSparkDistroExamplesJar: String = _ + private var appLocator: String = _ + private var driverPodName: String = _ + + override def beforeAll(): Unit = { + // The scalatest-maven-plugin gives system properties that are referenced but not set null + // values. We need to remove the null-value properties before initializing the test backend. + val nullValueProperties = System.getProperties.asScala + .filter(entry => entry._2.equals("null")) + .map(entry => entry._1.toString) + nullValueProperties.foreach { key => + System.clearProperty(key) + } + + val sparkDirProp = System.getProperty("spark.kubernetes.test.unpackSparkDir") + require(sparkDirProp != null, "Spark home directory must be provided in system properties.") + sparkHomeDir = Paths.get(sparkDirProp) + require(sparkHomeDir.toFile.isDirectory, + s"No directory found for spark home specified at $sparkHomeDir.") + val imageTag = getTestImageTag + val imageRepo = getTestImageRepo + image = s"$imageRepo/spark:$imageTag" + + val sparkDistroExamplesJarFile: File = sparkHomeDir.resolve(Paths.get("examples", "jars")) + .toFile + .listFiles(new PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0) + containerLocalSparkDistroExamplesJar = s"local:///opt/spark/examples/jars/" + + s"${sparkDistroExamplesJarFile.getName}" + testBackend = IntegrationTestBackendFactory.getTestBackend + testBackend.initialize() + kubernetesTestComponents = new KubernetesTestComponents(testBackend.getKubernetesClient) + } + + override def afterAll(): Unit = { + testBackend.cleanUp() + } + + before { + appLocator = UUID.randomUUID().toString.replaceAll("-", "") + driverPodName = "spark-test-app-" + UUID.randomUUID().toString.replaceAll("-", "") + sparkAppConf = kubernetesTestComponents.newSparkAppConf() + .set("spark.kubernetes.container.image", image) + .set("spark.kubernetes.driver.pod.name", driverPodName) + .set("spark.kubernetes.driver.label.spark-app-locator", appLocator) + .set("spark.kubernetes.executor.label.spark-app-locator", appLocator) + if (!kubernetesTestComponents.hasUserSpecifiedNamespace) { + kubernetesTestComponents.createNamespace() + } + } + + after { + if (!kubernetesTestComponents.hasUserSpecifiedNamespace) { + kubernetesTestComponents.deleteNamespace() + } + deleteDriverPod() + } + + test("Run SparkPi with no resources") { + runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with a very long application name.") { + sparkAppConf.set("spark.app.name", "long" * 40) + runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with a master URL without a scheme.") { + val url = kubernetesTestComponents.kubernetesClient.getMasterUrl + val k8sMasterUrl = if (url.getPort < 0) { + s"k8s://${url.getHost}" + } else { + s"k8s://${url.getHost}:${url.getPort}" + } + sparkAppConf.set("spark.master", k8sMasterUrl) + runSparkPiAndVerifyCompletion() + } + + test("Run SparkPi with an argument.") { + runSparkPiAndVerifyCompletion(appArgs = Array("5")) + } + + test("Run SparkPi with custom labels, annotations, and environment variables.") { + sparkAppConf + .set("spark.kubernetes.driver.label.label1", "label1-value") + .set("spark.kubernetes.driver.label.label2", "label2-value") + .set("spark.kubernetes.driver.annotation.annotation1", "annotation1-value") + .set("spark.kubernetes.driver.annotation.annotation2", "annotation2-value") + .set("spark.kubernetes.driverEnv.ENV1", "VALUE1") + .set("spark.kubernetes.driverEnv.ENV2", "VALUE2") + .set("spark.kubernetes.executor.label.label1", "label1-value") + .set("spark.kubernetes.executor.label.label2", "label2-value") + .set("spark.kubernetes.executor.annotation.annotation1", "annotation1-value") + .set("spark.kubernetes.executor.annotation.annotation2", "annotation2-value") + .set("spark.executorEnv.ENV1", "VALUE1") + .set("spark.executorEnv.ENV2", "VALUE2") + + runSparkPiAndVerifyCompletion( + driverPodChecker = (driverPod: Pod) => { + doBasicDriverPodCheck(driverPod) + checkCustomSettings(driverPod) + }, + executorPodChecker = (executorPod: Pod) => { + doBasicExecutorPodCheck(executorPod) + checkCustomSettings(executorPod) + }) + } + + // TODO(ssuchter): Enable the below after debugging + // test("Run PageRank using remote data file") { + // sparkAppConf + // .set("spark.kubernetes.mountDependencies.filesDownloadDir", + // CONTAINER_LOCAL_FILE_DOWNLOAD_PATH) + // .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE) + // runSparkPageRankAndVerifyCompletion( + // appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE)) + // } + + private def runSparkPiAndVerifyCompletion( + appResource: String = containerLocalSparkDistroExamplesJar, + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, + appArgs: Array[String] = Array.empty[String], + appLocator: String = appLocator): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, + SPARK_PI_MAIN_CLASS, + Seq("Pi is roughly 3"), + appArgs, + driverPodChecker, + executorPodChecker, + appLocator) + } + + private def runSparkPageRankAndVerifyCompletion( + appResource: String = containerLocalSparkDistroExamplesJar, + driverPodChecker: Pod => Unit = doBasicDriverPodCheck, + executorPodChecker: Pod => Unit = doBasicExecutorPodCheck, + appArgs: Array[String], + appLocator: String = appLocator): Unit = { + runSparkApplicationAndVerifyCompletion( + appResource, + SPARK_PAGE_RANK_MAIN_CLASS, + Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"), + appArgs, + driverPodChecker, + executorPodChecker, + appLocator) + } + + private def runSparkApplicationAndVerifyCompletion( + appResource: String, + mainClass: String, + expectedLogOnCompletion: Seq[String], + appArgs: Array[String], + driverPodChecker: Pod => Unit, + executorPodChecker: Pod => Unit, + appLocator: String): Unit = { + val appArguments = SparkAppArguments( + mainAppResource = appResource, + mainClass = mainClass, + appArgs = appArgs) + SparkAppLauncher.launch(appArguments, sparkAppConf, TIMEOUT.value.toSeconds.toInt, sparkHomeDir) + + val driverPod = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", appLocator) + .withLabel("spark-role", "driver") + .list() + .getItems + .get(0) + driverPodChecker(driverPod) + + val executorPods = kubernetesTestComponents.kubernetesClient + .pods() + .withLabel("spark-app-locator", appLocator) + .withLabel("spark-role", "executor") + .list() + .getItems + executorPods.asScala.foreach { pod => + executorPodChecker(pod) + } + + Eventually.eventually(TIMEOUT, INTERVAL) { + expectedLogOnCompletion.foreach { e => + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPod.getMetadata.getName) + .getLog + .contains(e), "The application did not complete.") + } + } + } + + private def doBasicDriverPodCheck(driverPod: Pod): Unit = { + assert(driverPod.getMetadata.getName === driverPodName) + assert(driverPod.getSpec.getContainers.get(0).getImage === image) + assert(driverPod.getSpec.getContainers.get(0).getName === "spark-kubernetes-driver") + } + + private def doBasicExecutorPodCheck(executorPod: Pod): Unit = { + assert(executorPod.getSpec.getContainers.get(0).getImage === image) + assert(executorPod.getSpec.getContainers.get(0).getName === "executor") + } + + private def checkCustomSettings(pod: Pod): Unit = { + assert(pod.getMetadata.getLabels.get("label1") === "label1-value") + assert(pod.getMetadata.getLabels.get("label2") === "label2-value") + assert(pod.getMetadata.getAnnotations.get("annotation1") === "annotation1-value") + assert(pod.getMetadata.getAnnotations.get("annotation2") === "annotation2-value") + + val container = pod.getSpec.getContainers.get(0) + val envVars = container + .getEnv + .asScala + .map { env => + (env.getName, env.getValue) + } + .toMap + assert(envVars("ENV1") === "VALUE1") + assert(envVars("ENV2") === "VALUE2") + } + + private def deleteDriverPod(): Unit = { + kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete() + Eventually.eventually(TIMEOUT, INTERVAL) { + assert(kubernetesTestComponents.kubernetesClient + .pods() + .withName(driverPodName) + .get() == null) + } + } +} + +private[spark] object KubernetesSuite { + + val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes)) + val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds)) + val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi" + val SPARK_PAGE_RANK_MAIN_CLASS: String = "org.apache.spark.examples.SparkPageRank" + + // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files" + + // val REMOTE_PAGE_RANK_DATA_FILE = + // "https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt" + // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE = + // s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt" + + // case object ShuffleNotReadyException extends Exception +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala new file mode 100644 index 0000000..4872714 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.nio.file.{Path, Paths} +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import io.fabric8.kubernetes.client.DefaultKubernetesClient +import org.scalatest.concurrent.Eventually + +import org.apache.spark.internal.Logging + +private[spark] class KubernetesTestComponents(defaultClient: DefaultKubernetesClient) { + + val namespaceOption = Option(System.getProperty("spark.kubernetes.test.namespace")) + val hasUserSpecifiedNamespace = namespaceOption.isDefined + val namespace = namespaceOption.getOrElse(UUID.randomUUID().toString.replaceAll("-", "")) + private val serviceAccountName = + Option(System.getProperty("spark.kubernetes.test.serviceAccountName")) + .getOrElse("default") + val kubernetesClient = defaultClient.inNamespace(namespace) + val clientConfig = kubernetesClient.getConfiguration + + def createNamespace(): Unit = { + defaultClient.namespaces.createNew() + .withNewMetadata() + .withName(namespace) + .endMetadata() + .done() + } + + def deleteNamespace(): Unit = { + defaultClient.namespaces.withName(namespace).delete() + Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) { + val namespaceList = defaultClient + .namespaces() + .list() + .getItems + .asScala + require(!namespaceList.exists(_.getMetadata.getName == namespace)) + } + } + + def newSparkAppConf(): SparkAppConf = { + new SparkAppConf() + .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}") + .set("spark.kubernetes.namespace", namespace) + .set("spark.executor.memory", "500m") + .set("spark.executor.cores", "1") + .set("spark.executors.instances", "1") + .set("spark.app.name", "spark-test-app") + .set("spark.ui.enabled", "true") + .set("spark.testing", "false") + .set("spark.kubernetes.submission.waitAppCompletion", "false") + .set("spark.kubernetes.authenticate.driver.serviceAccountName", serviceAccountName) + } +} + +private[spark] class SparkAppConf { + + private val map = mutable.Map[String, String]() + + def set(key: String, value: String): SparkAppConf = { + map.put(key, value) + this + } + + def get(key: String): String = map.getOrElse(key, "") + + def setJars(jars: Seq[String]): Unit = set("spark.jars", jars.mkString(",")) + + override def toString: String = map.toString + + def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", s"${t._1}=${t._2}")) +} + +private[spark] case class SparkAppArguments( + mainAppResource: String, + mainClass: String, + appArgs: Array[String]) + +private[spark] object SparkAppLauncher extends Logging { + + def launch( + appArguments: SparkAppArguments, + appConf: SparkAppConf, + timeoutSecs: Int, + sparkHomeDir: Path): Unit = { + val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", "spark-submit")) + logInfo(s"Launching a spark app with arguments $appArguments and conf $appConf") + val appArgsArray = + if (appArguments.appArgs.length > 0) Array(appArguments.appArgs.mkString(" ")) + else Array[String]() + val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath, + "--deploy-mode", "cluster", + "--class", appArguments.mainClass, + "--master", appConf.get("spark.master") + ) ++ appConf.toStringArray :+ + appArguments.mainAppResource) ++ + appArgsArray + ProcessUtils.executeProcess(commandLine, timeoutSecs) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala new file mode 100644 index 0000000..d8f3a6c --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.util.concurrent.TimeUnit + +import scala.collection.mutable.ArrayBuffer +import scala.io.Source + +import org.apache.spark.internal.Logging + +object ProcessUtils extends Logging { + /** + * executeProcess is used to run a command and return the output if it + * completes within timeout seconds. + */ + def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = { + val pb = new ProcessBuilder().command(fullCommand: _*) + pb.redirectErrorStream(true) + val proc = pb.start() + val outputLines = new ArrayBuffer[String] + Utils.tryWithResource(proc.getInputStream)( + Source.fromInputStream(_, "UTF-8").getLines().foreach { line => + logInfo(line) + outputLines += line + }) + assert(proc.waitFor(timeout, TimeUnit.SECONDS), + s"Timed out while executing ${fullCommand.mkString(" ")}") + assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" ")}") + outputLines + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala new file mode 100644 index 0000000..f1fd6dc --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.util.concurrent.TimeUnit + +import com.google.common.util.concurrent.SettableFuture +import io.fabric8.kubernetes.api.model.HasMetadata +import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher} +import io.fabric8.kubernetes.client.Watcher.Action +import io.fabric8.kubernetes.client.internal.readiness.Readiness + +private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] { + + private val signal = SettableFuture.create[Boolean] + + override def eventReceived(action: Action, resource: T): Unit = { + if ((action == Action.MODIFIED || action == Action.ADDED) && + Readiness.isReady(resource)) { + signal.set(true) + } + } + + override def onClose(cause: KubernetesClientException): Unit = {} + + def waitUntilReady(): Boolean = signal.get(60, TimeUnit.SECONDS) +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala new file mode 100644 index 0000000..663f8b6 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.Closeable +import java.net.URI + +import org.apache.spark.internal.Logging + +object Utils extends Logging { + + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { + val resource = createResource + try f.apply(resource) finally resource.close() + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala new file mode 100644 index 0000000..284712c --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.k8s.integrationtest.backend + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend + +private[spark] trait IntegrationTestBackend { + def initialize(): Unit + def getKubernetesClient: DefaultKubernetesClient + def cleanUp(): Unit = {} +} + +private[spark] object IntegrationTestBackendFactory { + val deployModeConfigKey = "spark.kubernetes.test.deployMode" + + def getTestBackend: IntegrationTestBackend = { + val deployMode = Option(System.getProperty(deployModeConfigKey)) + .getOrElse("minikube") + if (deployMode == "minikube") { + MinikubeTestBackend + } else { + throw new IllegalArgumentException( + "Invalid " + deployModeConfigKey + ": " + deployMode) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala new file mode 100644 index 0000000..6494cbc --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.minikube + +import java.io.File +import java.nio.file.Paths + +import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient} + +import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils +import org.apache.spark.internal.Logging + +// TODO support windows +private[spark] object Minikube extends Logging { + + private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60 + + def getMinikubeIp: String = { + val outputs = executeMinikube("ip") + .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$")) + assert(outputs.size == 1, "Unexpected amount of output from minikube ip") + outputs.head + } + + def getMinikubeStatus: MinikubeStatus.Value = { + val statusString = executeMinikube("status") + .filter(line => line.contains("minikubeVM: ") || line.contains("minikube:")) + .head + .replaceFirst("minikubeVM: ", "") + .replaceFirst("minikube: ", "") + MinikubeStatus.unapply(statusString) + .getOrElse(throw new IllegalStateException(s"Unknown status $statusString")) + } + + def getKubernetesClient: DefaultKubernetesClient = { + val kubernetesMaster = s"https://${getMinikubeIp}:8443" + val userHome = System.getProperty("user.home") + val kubernetesConf = new ConfigBuilder() + .withApiVersion("v1") + .withMasterUrl(kubernetesMaster) + .withCaCertFile(Paths.get(userHome, ".minikube", "ca.crt").toFile.getAbsolutePath) + .withClientCertFile(Paths.get(userHome, ".minikube", "apiserver.crt").toFile.getAbsolutePath) + .withClientKeyFile(Paths.get(userHome, ".minikube", "apiserver.key").toFile.getAbsolutePath) + .build() + new DefaultKubernetesClient(kubernetesConf) + } + + private def executeMinikube(action: String, args: String*): Seq[String] = { + ProcessUtils.executeProcess( + Array("bash", "-c", s"minikube $action") ++ args, MINIKUBE_STARTUP_TIMEOUT_SECONDS) + } +} + +private[spark] object MinikubeStatus extends Enumeration { + + // The following states are listed according to + // https://github.com/docker/machine/blob/master/libmachine/state/state.go. + val STARTING = status("Starting") + val RUNNING = status("Running") + val PAUSED = status("Paused") + val STOPPING = status("Stopping") + val STOPPED = status("Stopped") + val ERROR = status("Error") + val TIMEOUT = status("Timeout") + val SAVED = status("Saved") + val NONE = status("") + + def status(value: String): Value = new Val(nextId, value) + def unapply(s: String): Option[Value] = values.find(s == _.toString) +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala new file mode 100644 index 0000000..cb93241 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest.backend.minikube + +import io.fabric8.kubernetes.client.DefaultKubernetesClient + +import org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend + +private[spark] object MinikubeTestBackend extends IntegrationTestBackend { + + private var defaultClient: DefaultKubernetesClient = _ + + override def initialize(): Unit = { + val minikubeStatus = Minikube.getMinikubeStatus + require(minikubeStatus == MinikubeStatus.RUNNING, + s"Minikube must be running to use the Minikube backend for integration tests." + + s" Current status is: $minikubeStatus.") + defaultClient = Minikube.getKubernetesClient + } + + override def cleanUp(): Unit = { + super.cleanUp() + } + + override def getKubernetesClient: DefaultKubernetesClient = { + defaultClient + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala new file mode 100644 index 0000000..a81ef45 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +import java.io.File + +import com.google.common.base.Charsets +import com.google.common.io.Files + +package object config { + def getTestImageTag: String = { + val imageTagFileProp = System.getProperty("spark.kubernetes.test.imageTagFile") + require(imageTagFileProp != null, "Image tag file must be provided in system properties.") + val imageTagFile = new File(imageTagFileProp) + require(imageTagFile.isFile, s"No file found for image tag at ${imageTagFile.getAbsolutePath}.") + Files.toString(imageTagFile, Charsets.UTF_8).trim + } + + def getTestImageRepo: String = { + val imageRepo = System.getProperty("spark.kubernetes.test.imageRepo") + require(imageRepo != null, "Image repo must be provided in system properties.") + imageRepo + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala ---------------------------------------------------------------------- diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala new file mode 100644 index 0000000..0807a68 --- /dev/null +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s.integrationtest + +package object constants { + val MINIKUBE_TEST_BACKEND = "minikube" + val GCE_TEST_BACKEND = "gce" +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org