Repository: spark
Updated Branches:
  refs/heads/master b070ded28 -> f433ef786


[SPARK-23010][K8S] Initial checkin of k8s integration tests.

These tests were developed in the 
https://github.com/apache-spark-on-k8s/spark-integration repo
by several contributors. This is a copy of the current state into the main 
apache spark repo.
The only changes from the current spark-integration repo state are:
* Move the files from the repo root into 
resource-managers/kubernetes/integration-tests
* Add a reference to these tests in the root README.md
* Fix a path reference in dev/dev-run-integration-tests.sh
* Add a TODO in include/util.sh

## What changes were proposed in this pull request?

Incorporation of Kubernetes integration tests.

## How was this patch tested?

This code has its own unit tests, but the main purpose is to provide the 
integration tests.
I tested this on my laptop by running dev/dev-run-integration-tests.sh 
--spark-tgz ~/spark-2.4.0-SNAPSHOT-bin--.tgz

The spark-integration tests have already been running for months in AMPLab, 
here is an example:
https://amplab.cs.berkeley.edu/jenkins/job/testing-k8s-scheduled-spark-integration-master/

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Sean Suchter <sean-git...@suchter.com>
Author: Sean Suchter <ssuch...@pepperdata.com>

Closes #20697 from ssuchter/ssuchter-k8s-integration-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f433ef78
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f433ef78
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f433ef78

Branch: refs/heads/master
Commit: f433ef786770e48e3594ad158ce9908f98ef0d9a
Parents: b070ded
Author: Sean Suchter <sean-git...@suchter.com>
Authored: Fri Jun 8 15:15:24 2018 -0700
Committer: mcheah <mch...@palantir.com>
Committed: Fri Jun 8 15:15:24 2018 -0700

----------------------------------------------------------------------
 README.md                                       |   2 +
 dev/tox.ini                                     |   2 +-
 pom.xml                                         |   1 +
 .../kubernetes/integration-tests/README.md      |  52 ++++
 .../dev/dev-run-integration-tests.sh            |  93 ++++++
 .../integration-tests/dev/spark-rbac.yaml       |  52 ++++
 .../kubernetes/integration-tests/pom.xml        | 155 ++++++++++
 .../scripts/setup-integration-test-env.sh       |  91 ++++++
 .../src/test/resources/log4j.properties         |  31 ++
 .../k8s/integrationtest/KubernetesSuite.scala   | 294 +++++++++++++++++++
 .../KubernetesTestComponents.scala              | 120 ++++++++
 .../k8s/integrationtest/ProcessUtils.scala      |  46 +++
 .../integrationtest/SparkReadinessWatcher.scala |  41 +++
 .../deploy/k8s/integrationtest/Utils.scala      |  30 ++
 .../backend/IntegrationTestBackend.scala        |  43 +++
 .../backend/minikube/Minikube.scala             |  84 ++++++
 .../backend/minikube/MinikubeTestBackend.scala  |  42 +++
 .../deploy/k8s/integrationtest/config.scala     |  38 +++
 .../deploy/k8s/integrationtest/constants.scala  |  22 ++
 19 files changed, 1238 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index 1e521a7..531d330 100644
--- a/README.md
+++ b/README.md
@@ -81,6 +81,8 @@ can be run using:
 Please see the guidance on how to
 [run tests for a module, or individual 
tests](http://spark.apache.org/developer-tools.html#individual-tests).
 
+There is also a Kubernetes integration test, see 
resource-managers/kubernetes/integration-tests/README.md
+
 ## A Note About Hadoop Versions
 
 Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/dev/tox.ini
----------------------------------------------------------------------
diff --git a/dev/tox.ini b/dev/tox.ini
index 583c1ea..28dad8f 100644
--- a/dev/tox.ini
+++ b/dev/tox.ini
@@ -16,4 +16,4 @@
 [pycodestyle]
 ignore=E402,E731,E241,W503,E226,E722,E741,E305
 max-line-length=100
-exclude=cloudpickle.py,heapq3.py,shared.py,python/docs/conf.py,work/*/*.py,python/.eggs/*
+exclude=cloudpickle.py,heapq3.py,shared.py,python/docs/conf.py,work/*/*.py,python/.eggs/*,dist/*

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 883c096..23bbd3b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2705,6 +2705,7 @@
       <id>kubernetes</id>
       <modules>
         <module>resource-managers/kubernetes/core</module>
+        <module>resource-managers/kubernetes/integration-tests</module>
       </modules>
     </profile>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/README.md
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/README.md 
b/resource-managers/kubernetes/integration-tests/README.md
new file mode 100644
index 0000000..b3863e6
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/README.md
@@ -0,0 +1,52 @@
+---
+layout: global
+title: Spark on Kubernetes Integration Tests
+---
+
+# Running the Kubernetes Integration Tests
+
+Note that the integration test framework is currently being heavily revised and
+is subject to change. Note that currently the integration tests only run with 
Java 8.
+
+The simplest way to run the integration tests is to install and run Minikube, 
then run the following:
+
+    dev/dev-run-integration-tests.sh
+
+The minimum tested version of Minikube is 0.23.0. The kube-dns addon must be 
enabled. Minikube should
+run with a minimum of 3 CPUs and 4G of memory:
+
+    minikube start --cpus 3 --memory 4096
+
+You can download Minikube 
[here](https://github.com/kubernetes/minikube/releases).
+
+# Integration test customization
+
+Configuration of the integration test runtime is done through passing 
different arguments to the test script. The main useful options are outlined 
below.
+
+## Re-using Docker Images
+
+By default, the test framework will build new Docker images on every test 
execution. A unique image tag is generated,
+and it is written to file at `target/imageTag.txt`. To reuse the images built 
in a previous run, or to use a Docker image tag
+that you have built by other means already, pass the tag to the test script:
+
+    dev/dev-run-integration-tests.sh --image-tag <tag>
+
+where if you still want to use images that were built before by the test 
framework:
+
+    dev/dev-run-integration-tests.sh --image-tag $(cat target/imageTag.txt)
+
+## Spark Distribution Under Test
+
+The Spark code to test is handed to the integration test system via a tarball. 
Here is the option that is used to specify the tarball:
+
+* `--spark-tgz <path-to-tgz>` - set `<path-to-tgz>` to point to a tarball 
containing the Spark distribution to test.
+
+TODO: Don't require the packaging of the built Spark artifacts into this 
tarball, just read them out of the current tree.
+
+## Customizing the Namespace and Service Account
+
+* `--namespace <namespace>` - set `<namespace>` to the namespace in which the 
tests should be run.
+* `--service-account <service account name>` - set `<service account name>` to 
the name of the Kubernetes service account to
+use in the namespace specified by the `--namespace`. The service account is 
expected to have permissions to get, list, watch,
+and create pods. For clusters with RBAC turned on, it's important that the 
right permissions are granted to the service account
+in the namespace through an appropriate role and role binding. A reference 
RBAC configuration is provided in `dev/spark-rbac.yaml`.

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
 
b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
new file mode 100755
index 0000000..ea893fa
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/dev/dev-run-integration-tests.sh
@@ -0,0 +1,93 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+TEST_ROOT_DIR=$(git rev-parse 
--show-toplevel)/resource-managers/kubernetes/integration-tests
+
+cd "${TEST_ROOT_DIR}"
+
+DEPLOY_MODE="minikube"
+IMAGE_REPO="docker.io/kubespark"
+SPARK_TGZ="N/A"
+IMAGE_TAG="N/A"
+SPARK_MASTER=
+NAMESPACE=
+SERVICE_ACCOUNT=
+
+# Parse arguments
+while (( "$#" )); do
+  case $1 in
+    --image-repo)
+      IMAGE_REPO="$2"
+      shift
+      ;;
+    --image-tag)
+      IMAGE_TAG="$2"
+      shift
+      ;;
+    --deploy-mode)
+      DEPLOY_MODE="$2"
+      shift
+      ;;
+    --spark-tgz)
+      SPARK_TGZ="$2"
+      shift
+      ;;
+    --spark-master)
+      SPARK_MASTER="$2"
+      shift
+      ;;
+    --namespace)
+      NAMESPACE="$2"
+      shift
+      ;;
+    --service-account)
+      SERVICE_ACCOUNT="$2"
+      shift
+      ;;
+    *)
+      break
+      ;;
+  esac
+  shift
+done
+
+cd $TEST_ROOT_DIR
+
+properties=(
+  -Dspark.kubernetes.test.sparkTgz=$SPARK_TGZ \
+  -Dspark.kubernetes.test.imageTag=$IMAGE_TAG \
+  -Dspark.kubernetes.test.imageRepo=$IMAGE_REPO \
+  -Dspark.kubernetes.test.deployMode=$DEPLOY_MODE
+)
+
+if [ -n $NAMESPACE ];
+then
+  properties=( ${properties[@]} -Dspark.kubernetes.test.namespace=$NAMESPACE )
+fi
+
+if [ -n $SERVICE_ACCOUNT ];
+then
+  properties=( ${properties[@]} 
-Dspark.kubernetes.test.serviceAccountName=$SERVICE_ACCOUNT )
+fi
+
+if [ -n $SPARK_MASTER ];
+then
+  properties=( ${properties[@]} -Dspark.kubernetes.test.master=$SPARK_MASTER )
+fi
+
+../../../build/mvn integration-test ${properties[@]}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml 
b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml
new file mode 100644
index 0000000..a4c242f
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/dev/spark-rbac.yaml
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+apiVersion: v1
+kind: Namespace
+metadata:
+  name: spark
+---
+apiVersion: v1
+kind: ServiceAccount
+metadata:
+  name: spark-sa
+  namespace: spark
+---
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRole
+metadata:
+  name: spark-role
+rules:
+- apiGroups:
+  - ""
+  resources:
+  - "pods"
+  verbs:
+  - "*"
+---
+apiVersion: rbac.authorization.k8s.io/v1beta1
+kind: ClusterRoleBinding
+metadata:
+  name: spark-role-binding
+subjects:
+- kind: ServiceAccount
+  name: spark-sa
+  namespace: spark
+roleRef:
+  kind: ClusterRole
+  name: spark-role
+  apiGroup: rbac.authorization.k8s.io
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/pom.xml
----------------------------------------------------------------------
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml 
b/resource-managers/kubernetes/integration-tests/pom.xml
new file mode 100644
index 0000000..520bda8
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -0,0 +1,155 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.spark</groupId>
+    <artifactId>spark-parent_2.11</artifactId>
+    <version>2.4.0-SNAPSHOT</version>
+    <relativePath>../../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>spark-kubernetes-integration-tests_2.11</artifactId>
+  <groupId>spark-kubernetes-integration-tests</groupId>
+  <properties>
+    <download-maven-plugin.version>1.3.0</download-maven-plugin.version>
+    <exec-maven-plugin.version>1.4.0</exec-maven-plugin.version>
+    <extraScalaTestArgs></extraScalaTestArgs>
+    <kubernetes-client.version>3.0.0</kubernetes-client.version>
+    <scala-maven-plugin.version>3.2.2</scala-maven-plugin.version>
+    <scalatest-maven-plugin.version>1.0</scalatest-maven-plugin.version>
+    <sbt.project.name>kubernetes-integration-tests</sbt.project.name>
+    
<spark.kubernetes.test.unpackSparkDir>${project.build.directory}/spark-dist-unpacked</spark.kubernetes.test.unpackSparkDir>
+    <spark.kubernetes.test.imageTag>N/A</spark.kubernetes.test.imageTag>
+    
<spark.kubernetes.test.imageTagFile>${project.build.directory}/imageTag.txt</spark.kubernetes.test.imageTagFile>
+    
<spark.kubernetes.test.deployMode>minikube</spark.kubernetes.test.deployMode>
+    
<spark.kubernetes.test.imageRepo>docker.io/kubespark</spark.kubernetes.test.imageRepo>
+    <test.exclude.tags></test.exclude.tags>
+  </properties>
+  <packaging>jar</packaging>
+  <name>Spark Project Kubernetes Integration Tests</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-client</artifactId>
+      <version>${kubernetes-client.version}</version>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>exec-maven-plugin</artifactId>
+        <version>${exec-maven-plugin.version}</version>
+        <executions>
+          <execution>
+            <id>setup-integration-test-env</id>
+            <phase>pre-integration-test</phase>
+            <goals>
+              <goal>exec</goal>
+            </goals>
+            <configuration>
+              <executable>scripts/setup-integration-test-env.sh</executable>
+              <arguments>
+                <argument>--unpacked-spark-tgz</argument>
+                <argument>${spark.kubernetes.test.unpackSparkDir}</argument>
+
+                <argument>--image-repo</argument>
+                <argument>${spark.kubernetes.test.imageRepo}</argument>
+
+                <argument>--image-tag</argument>
+                <argument>${spark.kubernetes.test.imageTag}</argument>
+
+                <argument>--image-tag-output-file</argument>
+                <argument>${spark.kubernetes.test.imageTagFile}</argument>
+
+                <argument>--deploy-mode</argument>
+                <argument>${spark.kubernetes.test.deployMode}</argument>
+
+                <argument>--spark-tgz</argument>
+                <argument>${spark.kubernetes.test.sparkTgz}</argument>
+              </arguments>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <!-- Triggers scalatest plugin in the integration-test phase instead of
+             the test phase. -->
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>${scalatest-maven-plugin.version}</version>
+        <configuration>
+          
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>SparkTestSuite.txt</filereports>
+          <argLine>-ea -Xmx3g -XX:ReservedCodeCacheSize=512m 
${extraScalaTestArgs}</argLine>
+          <stderr/>
+          <systemProperties>
+            
<log4j.configuration>file:src/test/resources/log4j.properties</log4j.configuration>
+            <java.awt.headless>true</java.awt.headless>
+            
<spark.kubernetes.test.imageTagFile>${spark.kubernetes.test.imageTagFile}</spark.kubernetes.test.imageTagFile>
+            
<spark.kubernetes.test.unpackSparkDir>${spark.kubernetes.test.unpackSparkDir}</spark.kubernetes.test.unpackSparkDir>
+            
<spark.kubernetes.test.imageRepo>${spark.kubernetes.test.imageRepo}</spark.kubernetes.test.imageRepo>
+            
<spark.kubernetes.test.deployMode>${spark.kubernetes.test.deployMode}</spark.kubernetes.test.deployMode>
+            
<spark.kubernetes.test.master>${spark.kubernetes.test.master}</spark.kubernetes.test.master>
+            
<spark.kubernetes.test.namespace>${spark.kubernetes.test.namespace}</spark.kubernetes.test.namespace>
+            
<spark.kubernetes.test.serviceAccountName>${spark.kubernetes.test.serviceAccountName}</spark.kubernetes.test.serviceAccountName>
+          </systemProperties>
+          <tagsToExclude>${test.exclude.tags}</tagsToExclude>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+            <configuration>
+              <!-- The negative pattern below prevents integration tests such 
as
+                   KubernetesSuite from running in the test phase. -->
+              <suffixes>(?&lt;!Suite)</suffixes>
+            </configuration>
+          </execution>
+          <execution>
+            <id>integration-test</id>
+            <phase>integration-test</phase>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh
 
b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh
new file mode 100755
index 0000000..ccfb8e7
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh
@@ -0,0 +1,91 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+TEST_ROOT_DIR=$(git rev-parse --show-toplevel)
+UNPACKED_SPARK_TGZ="$TEST_ROOT_DIR/target/spark-dist-unpacked"
+IMAGE_TAG_OUTPUT_FILE="$TEST_ROOT_DIR/target/image-tag.txt"
+DEPLOY_MODE="minikube"
+IMAGE_REPO="docker.io/kubespark"
+IMAGE_TAG="N/A"
+SPARK_TGZ="N/A"
+
+# Parse arguments
+while (( "$#" )); do
+  case $1 in
+    --unpacked-spark-tgz)
+      UNPACKED_SPARK_TGZ="$2"
+      shift
+      ;;
+    --image-repo)
+      IMAGE_REPO="$2"
+      shift
+      ;;
+    --image-tag)
+      IMAGE_TAG="$2"
+      shift
+      ;;
+    --image-tag-output-file)
+      IMAGE_TAG_OUTPUT_FILE="$2"
+      shift
+      ;;
+    --deploy-mode)
+      DEPLOY_MODE="$2"
+      shift
+      ;;
+    --spark-tgz)
+      SPARK_TGZ="$2"
+      shift
+      ;;
+    *)
+      break
+      ;;
+  esac
+  shift
+done
+
+if [[ $SPARK_TGZ == "N/A" ]];
+then
+  echo "Must specify a Spark tarball to build Docker images against with 
--spark-tgz." && exit 1;
+fi
+
+rm -rf $UNPACKED_SPARK_TGZ
+mkdir -p $UNPACKED_SPARK_TGZ
+tar -xzvf $SPARK_TGZ --strip-components=1 -C $UNPACKED_SPARK_TGZ;
+
+if [[ $IMAGE_TAG == "N/A" ]];
+then
+  IMAGE_TAG=$(uuidgen);
+  cd $UNPACKED_SPARK_TGZ
+  if [[ $DEPLOY_MODE == cloud ]] ;
+  then
+    $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -r $IMAGE_REPO -t $IMAGE_TAG 
build
+    if  [[ $IMAGE_REPO == gcr.io* ]] ;
+    then
+      gcloud docker -- push $IMAGE_REPO/spark:$IMAGE_TAG
+    else
+      $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -r $IMAGE_REPO -t 
$IMAGE_TAG push
+    fi
+  else
+    # -m option for minikube.
+    $UNPACKED_SPARK_TGZ/bin/docker-image-tool.sh -m -r $IMAGE_REPO -t 
$IMAGE_TAG build
+  fi
+  cd -
+fi
+
+rm -f $IMAGE_TAG_OUTPUT_FILE
+echo -n $IMAGE_TAG > $IMAGE_TAG_OUTPUT_FILE

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties
 
b/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..866126b
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file target/integration-tests.log
+log4j.rootCategory=INFO, file
+log4j.appender.file=org.apache.log4j.FileAppender
+log4j.appender.file.append=true
+log4j.appender.file.file=target/integration-tests.log
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p 
%c{1}: %m%n
+
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark_project.jetty=WARN

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
new file mode 100644
index 0000000..65c513c
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+import java.nio.file.{Path, Paths}
+import java.util.UUID
+import java.util.regex.Pattern
+
+import scala.collection.JavaConverters._
+
+import com.google.common.io.PatternFilenameFilter
+import io.fabric8.kubernetes.api.model.{Container, Pod}
+import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Minutes, Seconds, Span}
+
+import org.apache.spark.SparkFunSuite
+import 
org.apache.spark.deploy.k8s.integrationtest.backend.{IntegrationTestBackend, 
IntegrationTestBackendFactory}
+import org.apache.spark.deploy.k8s.integrationtest.config._
+
+private[spark] class KubernetesSuite extends SparkFunSuite
+  with BeforeAndAfterAll with BeforeAndAfter {
+
+  import KubernetesSuite._
+
+  private var testBackend: IntegrationTestBackend = _
+  private var sparkHomeDir: Path = _
+  private var kubernetesTestComponents: KubernetesTestComponents = _
+  private var sparkAppConf: SparkAppConf = _
+  private var image: String = _
+  private var containerLocalSparkDistroExamplesJar: String = _
+  private var appLocator: String = _
+  private var driverPodName: String = _
+
+  override def beforeAll(): Unit = {
+    // The scalatest-maven-plugin gives system properties that are referenced 
but not set null
+    // values. We need to remove the null-value properties before initializing 
the test backend.
+    val nullValueProperties = System.getProperties.asScala
+      .filter(entry => entry._2.equals("null"))
+      .map(entry => entry._1.toString)
+    nullValueProperties.foreach { key =>
+      System.clearProperty(key)
+    }
+
+    val sparkDirProp = 
System.getProperty("spark.kubernetes.test.unpackSparkDir")
+    require(sparkDirProp != null, "Spark home directory must be provided in 
system properties.")
+    sparkHomeDir = Paths.get(sparkDirProp)
+    require(sparkHomeDir.toFile.isDirectory,
+      s"No directory found for spark home specified at $sparkHomeDir.")
+    val imageTag = getTestImageTag
+    val imageRepo = getTestImageRepo
+    image = s"$imageRepo/spark:$imageTag"
+
+    val sparkDistroExamplesJarFile: File = 
sparkHomeDir.resolve(Paths.get("examples", "jars"))
+      .toFile
+      .listFiles(new 
PatternFilenameFilter(Pattern.compile("^spark-examples_.*\\.jar$")))(0)
+    containerLocalSparkDistroExamplesJar = 
s"local:///opt/spark/examples/jars/" +
+      s"${sparkDistroExamplesJarFile.getName}"
+    testBackend = IntegrationTestBackendFactory.getTestBackend
+    testBackend.initialize()
+    kubernetesTestComponents = new 
KubernetesTestComponents(testBackend.getKubernetesClient)
+  }
+
+  override def afterAll(): Unit = {
+    testBackend.cleanUp()
+  }
+
+  before {
+    appLocator = UUID.randomUUID().toString.replaceAll("-", "")
+    driverPodName = "spark-test-app-" + 
UUID.randomUUID().toString.replaceAll("-", "")
+    sparkAppConf = kubernetesTestComponents.newSparkAppConf()
+      .set("spark.kubernetes.container.image", image)
+      .set("spark.kubernetes.driver.pod.name", driverPodName)
+      .set("spark.kubernetes.driver.label.spark-app-locator", appLocator)
+      .set("spark.kubernetes.executor.label.spark-app-locator", appLocator)
+    if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
+      kubernetesTestComponents.createNamespace()
+    }
+  }
+
+  after {
+    if (!kubernetesTestComponents.hasUserSpecifiedNamespace) {
+      kubernetesTestComponents.deleteNamespace()
+    }
+    deleteDriverPod()
+  }
+
+  test("Run SparkPi with no resources") {
+    runSparkPiAndVerifyCompletion()
+  }
+
+  test("Run SparkPi with a very long application name.") {
+    sparkAppConf.set("spark.app.name", "long" * 40)
+    runSparkPiAndVerifyCompletion()
+  }
+
+  test("Run SparkPi with a master URL without a scheme.") {
+    val url = kubernetesTestComponents.kubernetesClient.getMasterUrl
+    val k8sMasterUrl = if (url.getPort < 0) {
+      s"k8s://${url.getHost}"
+    } else {
+      s"k8s://${url.getHost}:${url.getPort}"
+    }
+    sparkAppConf.set("spark.master", k8sMasterUrl)
+    runSparkPiAndVerifyCompletion()
+  }
+
+  test("Run SparkPi with an argument.") {
+    runSparkPiAndVerifyCompletion(appArgs = Array("5"))
+  }
+
+  test("Run SparkPi with custom labels, annotations, and environment 
variables.") {
+    sparkAppConf
+      .set("spark.kubernetes.driver.label.label1", "label1-value")
+      .set("spark.kubernetes.driver.label.label2", "label2-value")
+      .set("spark.kubernetes.driver.annotation.annotation1", 
"annotation1-value")
+      .set("spark.kubernetes.driver.annotation.annotation2", 
"annotation2-value")
+      .set("spark.kubernetes.driverEnv.ENV1", "VALUE1")
+      .set("spark.kubernetes.driverEnv.ENV2", "VALUE2")
+      .set("spark.kubernetes.executor.label.label1", "label1-value")
+      .set("spark.kubernetes.executor.label.label2", "label2-value")
+      .set("spark.kubernetes.executor.annotation.annotation1", 
"annotation1-value")
+      .set("spark.kubernetes.executor.annotation.annotation2", 
"annotation2-value")
+      .set("spark.executorEnv.ENV1", "VALUE1")
+      .set("spark.executorEnv.ENV2", "VALUE2")
+
+    runSparkPiAndVerifyCompletion(
+      driverPodChecker = (driverPod: Pod) => {
+        doBasicDriverPodCheck(driverPod)
+        checkCustomSettings(driverPod)
+      },
+      executorPodChecker = (executorPod: Pod) => {
+        doBasicExecutorPodCheck(executorPod)
+        checkCustomSettings(executorPod)
+      })
+  }
+
+  // TODO(ssuchter): Enable the below after debugging
+  // test("Run PageRank using remote data file") {
+  //   sparkAppConf
+  //     .set("spark.kubernetes.mountDependencies.filesDownloadDir",
+  //       CONTAINER_LOCAL_FILE_DOWNLOAD_PATH)
+  //     .set("spark.files", REMOTE_PAGE_RANK_DATA_FILE)
+  //   runSparkPageRankAndVerifyCompletion(
+  //     appArgs = Array(CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE))
+  // }
+
+  private def runSparkPiAndVerifyCompletion(
+      appResource: String = containerLocalSparkDistroExamplesJar,
+      driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+      executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+      appArgs: Array[String] = Array.empty[String],
+      appLocator: String = appLocator): Unit = {
+    runSparkApplicationAndVerifyCompletion(
+      appResource,
+      SPARK_PI_MAIN_CLASS,
+      Seq("Pi is roughly 3"),
+      appArgs,
+      driverPodChecker,
+      executorPodChecker,
+      appLocator)
+  }
+
+  private def runSparkPageRankAndVerifyCompletion(
+      appResource: String = containerLocalSparkDistroExamplesJar,
+      driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+      executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+      appArgs: Array[String],
+      appLocator: String = appLocator): Unit = {
+    runSparkApplicationAndVerifyCompletion(
+      appResource,
+      SPARK_PAGE_RANK_MAIN_CLASS,
+      Seq("1 has rank", "2 has rank", "3 has rank", "4 has rank"),
+      appArgs,
+      driverPodChecker,
+      executorPodChecker,
+      appLocator)
+  }
+
+  private def runSparkApplicationAndVerifyCompletion(
+      appResource: String,
+      mainClass: String,
+      expectedLogOnCompletion: Seq[String],
+      appArgs: Array[String],
+      driverPodChecker: Pod => Unit,
+      executorPodChecker: Pod => Unit,
+      appLocator: String): Unit = {
+    val appArguments = SparkAppArguments(
+      mainAppResource = appResource,
+      mainClass = mainClass,
+      appArgs = appArgs)
+    SparkAppLauncher.launch(appArguments, sparkAppConf, 
TIMEOUT.value.toSeconds.toInt, sparkHomeDir)
+
+    val driverPod = kubernetesTestComponents.kubernetesClient
+      .pods()
+      .withLabel("spark-app-locator", appLocator)
+      .withLabel("spark-role", "driver")
+      .list()
+      .getItems
+      .get(0)
+    driverPodChecker(driverPod)
+
+    val executorPods = kubernetesTestComponents.kubernetesClient
+      .pods()
+      .withLabel("spark-app-locator", appLocator)
+      .withLabel("spark-role", "executor")
+      .list()
+      .getItems
+    executorPods.asScala.foreach { pod =>
+      executorPodChecker(pod)
+    }
+
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      expectedLogOnCompletion.foreach { e =>
+        assert(kubernetesTestComponents.kubernetesClient
+          .pods()
+          .withName(driverPod.getMetadata.getName)
+          .getLog
+          .contains(e), "The application did not complete.")
+      }
+    }
+  }
+
+  private def doBasicDriverPodCheck(driverPod: Pod): Unit = {
+    assert(driverPod.getMetadata.getName === driverPodName)
+    assert(driverPod.getSpec.getContainers.get(0).getImage === image)
+    assert(driverPod.getSpec.getContainers.get(0).getName === 
"spark-kubernetes-driver")
+  }
+
+  private def doBasicExecutorPodCheck(executorPod: Pod): Unit = {
+    assert(executorPod.getSpec.getContainers.get(0).getImage === image)
+    assert(executorPod.getSpec.getContainers.get(0).getName === "executor")
+  }
+
+  private def checkCustomSettings(pod: Pod): Unit = {
+    assert(pod.getMetadata.getLabels.get("label1") === "label1-value")
+    assert(pod.getMetadata.getLabels.get("label2") === "label2-value")
+    assert(pod.getMetadata.getAnnotations.get("annotation1") === 
"annotation1-value")
+    assert(pod.getMetadata.getAnnotations.get("annotation2") === 
"annotation2-value")
+
+    val container = pod.getSpec.getContainers.get(0)
+    val envVars = container
+      .getEnv
+      .asScala
+      .map { env =>
+        (env.getName, env.getValue)
+      }
+      .toMap
+    assert(envVars("ENV1") === "VALUE1")
+    assert(envVars("ENV2") === "VALUE2")
+  }
+
+  private def deleteDriverPod(): Unit = {
+    
kubernetesTestComponents.kubernetesClient.pods().withName(driverPodName).delete()
+    Eventually.eventually(TIMEOUT, INTERVAL) {
+      assert(kubernetesTestComponents.kubernetesClient
+        .pods()
+        .withName(driverPodName)
+        .get() == null)
+    }
+  }
+}
+
+private[spark] object KubernetesSuite {
+
+  val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
+  val INTERVAL = PatienceConfiguration.Interval(Span(2, Seconds))
+  val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
+  val SPARK_PAGE_RANK_MAIN_CLASS: String = 
"org.apache.spark.examples.SparkPageRank"
+
+  // val CONTAINER_LOCAL_FILE_DOWNLOAD_PATH = "/var/spark-data/spark-files"
+
+  // val REMOTE_PAGE_RANK_DATA_FILE =
+  //   
"https://storage.googleapis.com/spark-k8s-integration-tests/files/pagerank_data.txt";
+  // val CONTAINER_LOCAL_DOWNLOADED_PAGE_RANK_DATA_FILE =
+  //   s"$CONTAINER_LOCAL_FILE_DOWNLOAD_PATH/pagerank_data.txt"
+
+  // case object ShuffleNotReadyException extends Exception
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
new file mode 100644
index 0000000..4872714
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.nio.file.{Path, Paths}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+import org.scalatest.concurrent.Eventually
+
+import org.apache.spark.internal.Logging
+
+private[spark] class KubernetesTestComponents(defaultClient: 
DefaultKubernetesClient) {
+
+  val namespaceOption = 
Option(System.getProperty("spark.kubernetes.test.namespace"))
+  val hasUserSpecifiedNamespace = namespaceOption.isDefined
+  val namespace = 
namespaceOption.getOrElse(UUID.randomUUID().toString.replaceAll("-", ""))
+  private val serviceAccountName =
+    Option(System.getProperty("spark.kubernetes.test.serviceAccountName"))
+      .getOrElse("default")
+  val kubernetesClient = defaultClient.inNamespace(namespace)
+  val clientConfig = kubernetesClient.getConfiguration
+
+  def createNamespace(): Unit = {
+    defaultClient.namespaces.createNew()
+      .withNewMetadata()
+      .withName(namespace)
+      .endMetadata()
+      .done()
+  }
+
+  def deleteNamespace(): Unit = {
+    defaultClient.namespaces.withName(namespace).delete()
+    Eventually.eventually(KubernetesSuite.TIMEOUT, KubernetesSuite.INTERVAL) {
+      val namespaceList = defaultClient
+        .namespaces()
+        .list()
+        .getItems
+        .asScala
+      require(!namespaceList.exists(_.getMetadata.getName == namespace))
+    }
+  }
+
+  def newSparkAppConf(): SparkAppConf = {
+    new SparkAppConf()
+      .set("spark.master", s"k8s://${kubernetesClient.getMasterUrl}")
+      .set("spark.kubernetes.namespace", namespace)
+      .set("spark.executor.memory", "500m")
+      .set("spark.executor.cores", "1")
+      .set("spark.executors.instances", "1")
+      .set("spark.app.name", "spark-test-app")
+      .set("spark.ui.enabled", "true")
+      .set("spark.testing", "false")
+      .set("spark.kubernetes.submission.waitAppCompletion", "false")
+      .set("spark.kubernetes.authenticate.driver.serviceAccountName", 
serviceAccountName)
+  }
+}
+
+private[spark] class SparkAppConf {
+
+  private val map = mutable.Map[String, String]()
+
+  def set(key: String, value: String): SparkAppConf = {
+    map.put(key, value)
+    this
+  }
+
+  def get(key: String): String = map.getOrElse(key, "")
+
+  def setJars(jars: Seq[String]): Unit = set("spark.jars", jars.mkString(","))
+
+  override def toString: String = map.toString
+
+  def toStringArray: Iterable[String] = map.toList.flatMap(t => List("--conf", 
s"${t._1}=${t._2}"))
+}
+
+private[spark] case class SparkAppArguments(
+    mainAppResource: String,
+    mainClass: String,
+    appArgs: Array[String])
+
+private[spark] object SparkAppLauncher extends Logging {
+
+  def launch(
+      appArguments: SparkAppArguments,
+      appConf: SparkAppConf,
+      timeoutSecs: Int,
+      sparkHomeDir: Path): Unit = {
+    val sparkSubmitExecutable = sparkHomeDir.resolve(Paths.get("bin", 
"spark-submit"))
+    logInfo(s"Launching a spark app with arguments $appArguments and conf 
$appConf")
+    val appArgsArray =
+      if (appArguments.appArgs.length > 0) 
Array(appArguments.appArgs.mkString(" "))
+      else Array[String]()
+    val commandLine = (Array(sparkSubmitExecutable.toFile.getAbsolutePath,
+      "--deploy-mode", "cluster",
+      "--class", appArguments.mainClass,
+      "--master", appConf.get("spark.master")
+    ) ++ appConf.toStringArray :+
+      appArguments.mainAppResource) ++
+      appArgsArray
+    ProcessUtils.executeProcess(commandLine, timeoutSecs)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
new file mode 100644
index 0000000..d8f3a6c
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ProcessUtils.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable.ArrayBuffer
+import scala.io.Source
+
+import org.apache.spark.internal.Logging
+
+object ProcessUtils extends Logging {
+  /**
+   * executeProcess is used to run a command and return the output if it
+   * completes within timeout seconds.
+   */
+  def executeProcess(fullCommand: Array[String], timeout: Long): Seq[String] = 
{
+    val pb = new ProcessBuilder().command(fullCommand: _*)
+    pb.redirectErrorStream(true)
+    val proc = pb.start()
+    val outputLines = new ArrayBuffer[String]
+    Utils.tryWithResource(proc.getInputStream)(
+      Source.fromInputStream(_, "UTF-8").getLines().foreach { line =>
+        logInfo(line)
+        outputLines += line
+      })
+    assert(proc.waitFor(timeout, TimeUnit.SECONDS),
+      s"Timed out while executing ${fullCommand.mkString(" ")}")
+    assert(proc.exitValue == 0, s"Failed to execute ${fullCommand.mkString(" 
")}")
+    outputLines
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
new file mode 100644
index 0000000..f1fd6dc
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.util.concurrent.TimeUnit
+
+import com.google.common.util.concurrent.SettableFuture
+import io.fabric8.kubernetes.api.model.HasMetadata
+import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
+import io.fabric8.kubernetes.client.Watcher.Action
+import io.fabric8.kubernetes.client.internal.readiness.Readiness
+
+private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends 
Watcher[T] {
+
+  private val signal = SettableFuture.create[Boolean]
+
+  override def eventReceived(action: Action, resource: T): Unit = {
+    if ((action == Action.MODIFIED || action == Action.ADDED) &&
+        Readiness.isReady(resource)) {
+      signal.set(true)
+    }
+  }
+
+  override def onClose(cause: KubernetesClientException): Unit = {}
+
+  def waitUntilReady(): Boolean = signal.get(60, TimeUnit.SECONDS)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
new file mode 100644
index 0000000..663f8b6
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.Closeable
+import java.net.URI
+
+import org.apache.spark.internal.Logging
+
+object Utils extends Logging {
+
+  def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = 
{
+    val resource = createResource
+    try f.apply(resource) finally resource.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
new file mode 100644
index 0000000..284712c
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/IntegrationTestBackend.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.k8s.integrationtest.backend
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import 
org.apache.spark.deploy.k8s.integrationtest.backend.minikube.MinikubeTestBackend
+
+private[spark] trait IntegrationTestBackend {
+  def initialize(): Unit
+  def getKubernetesClient: DefaultKubernetesClient
+  def cleanUp(): Unit = {}
+}
+
+private[spark] object IntegrationTestBackendFactory {
+  val deployModeConfigKey = "spark.kubernetes.test.deployMode"
+
+  def getTestBackend: IntegrationTestBackend = {
+    val deployMode = Option(System.getProperty(deployModeConfigKey))
+      .getOrElse("minikube")
+    if (deployMode == "minikube") {
+      MinikubeTestBackend
+    } else {
+      throw new IllegalArgumentException(
+        "Invalid " + deployModeConfigKey + ": " + deployMode)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
new file mode 100644
index 0000000..6494cbc
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/Minikube.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
+
+import java.io.File
+import java.nio.file.Paths
+
+import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient}
+
+import org.apache.spark.deploy.k8s.integrationtest.ProcessUtils
+import org.apache.spark.internal.Logging
+
+// TODO support windows
+private[spark] object Minikube extends Logging {
+
+  private val MINIKUBE_STARTUP_TIMEOUT_SECONDS = 60
+
+  def getMinikubeIp: String = {
+    val outputs = executeMinikube("ip")
+      .filter(_.matches("^\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}$"))
+    assert(outputs.size == 1, "Unexpected amount of output from minikube ip")
+    outputs.head
+  }
+
+  def getMinikubeStatus: MinikubeStatus.Value = {
+    val statusString = executeMinikube("status")
+      .filter(line => line.contains("minikubeVM: ") || 
line.contains("minikube:"))
+      .head
+      .replaceFirst("minikubeVM: ", "")
+      .replaceFirst("minikube: ", "")
+    MinikubeStatus.unapply(statusString)
+        .getOrElse(throw new IllegalStateException(s"Unknown status 
$statusString"))
+  }
+
+  def getKubernetesClient: DefaultKubernetesClient = {
+    val kubernetesMaster = s"https://${getMinikubeIp}:8443";
+    val userHome = System.getProperty("user.home")
+    val kubernetesConf = new ConfigBuilder()
+      .withApiVersion("v1")
+      .withMasterUrl(kubernetesMaster)
+      .withCaCertFile(Paths.get(userHome, ".minikube", 
"ca.crt").toFile.getAbsolutePath)
+      .withClientCertFile(Paths.get(userHome, ".minikube", 
"apiserver.crt").toFile.getAbsolutePath)
+      .withClientKeyFile(Paths.get(userHome, ".minikube", 
"apiserver.key").toFile.getAbsolutePath)
+      .build()
+    new DefaultKubernetesClient(kubernetesConf)
+  }
+
+  private def executeMinikube(action: String, args: String*): Seq[String] = {
+    ProcessUtils.executeProcess(
+      Array("bash", "-c", s"minikube $action") ++ args, 
MINIKUBE_STARTUP_TIMEOUT_SECONDS)
+  }
+}
+
+private[spark] object MinikubeStatus extends Enumeration {
+
+  // The following states are listed according to
+  // https://github.com/docker/machine/blob/master/libmachine/state/state.go.
+  val STARTING = status("Starting")
+  val RUNNING = status("Running")
+  val PAUSED = status("Paused")
+  val STOPPING = status("Stopping")
+  val STOPPED = status("Stopped")
+  val ERROR = status("Error")
+  val TIMEOUT = status("Timeout")
+  val SAVED = status("Saved")
+  val NONE = status("")
+
+  def status(value: String): Value = new Val(nextId, value)
+  def unapply(s: String): Option[Value] = values.find(s == _.toString)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
new file mode 100644
index 0000000..cb93241
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/minikube/MinikubeTestBackend.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest.backend.minikube
+
+import io.fabric8.kubernetes.client.DefaultKubernetesClient
+
+import 
org.apache.spark.deploy.k8s.integrationtest.backend.IntegrationTestBackend
+
+private[spark] object MinikubeTestBackend extends IntegrationTestBackend {
+
+  private var defaultClient: DefaultKubernetesClient = _
+
+  override def initialize(): Unit = {
+    val minikubeStatus = Minikube.getMinikubeStatus
+    require(minikubeStatus == MinikubeStatus.RUNNING,
+        s"Minikube must be running to use the Minikube backend for integration 
tests." +
+          s" Current status is: $minikubeStatus.")
+    defaultClient = Minikube.getKubernetesClient
+  }
+
+  override def cleanUp(): Unit = {
+    super.cleanUp()
+  }
+
+  override def getKubernetesClient: DefaultKubernetesClient = {
+    defaultClient
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
new file mode 100644
index 0000000..a81ef45
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/config.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.File
+
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+
+package object config {
+  def getTestImageTag: String = {
+    val imageTagFileProp = 
System.getProperty("spark.kubernetes.test.imageTagFile")
+    require(imageTagFileProp != null, "Image tag file must be provided in 
system properties.")
+    val imageTagFile = new File(imageTagFileProp)
+    require(imageTagFile.isFile, s"No file found for image tag at 
${imageTagFile.getAbsolutePath}.")
+    Files.toString(imageTagFile, Charsets.UTF_8).trim
+  }
+
+  def getTestImageRepo: String = {
+    val imageRepo = System.getProperty("spark.kubernetes.test.imageRepo")
+    require(imageRepo != null, "Image repo must be provided in system 
properties.")
+    imageRepo
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f433ef78/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
new file mode 100644
index 0000000..0807a68
--- /dev/null
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/constants.scala
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+package object constants {
+  val MINIKUBE_TEST_BACKEND = "minikube"
+  val GCE_TEST_BACKEND = "gce"
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to