ZEPPELIN-3254. Enable one build of zeppelin to work with multiple versions of 
spark

### What is this PR for?
This PR is trying to add test for one build of zeppelin to work with multiple 
versions of spark. There's 2 main system test
* `SparkIntegrationTest` ( It launches spark interpreter from interpreter 
setting component in 3 modes: local, yarn-client, yarn-cluster)
* `ZeppelinSparkClusterTest` (It launch spark interpreter in local mode from 
Paragraph side

### What type of PR is it?
[Improvement]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-3254

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <zjf...@apache.org>

Closes #2832 from zjffdu/ZEPPELIN-3254 and squashes the following commits:

3b06ed9 [Jeff Zhang] ZEPPELIN-3254. Enable one build of zeppelin to work with 
multiple versions of spark


Project: http://git-wip-us.apache.org/repos/asf/zeppelin/repo
Commit: http://git-wip-us.apache.org/repos/asf/zeppelin/commit/d6cdd56d
Tree: http://git-wip-us.apache.org/repos/asf/zeppelin/tree/d6cdd56d
Diff: http://git-wip-us.apache.org/repos/asf/zeppelin/diff/d6cdd56d

Branch: refs/heads/master
Commit: d6cdd56d8fbc551dee7c334eb403c19bc65d6e1f
Parents: d90716d
Author: Jeff Zhang <zjf...@apache.org>
Authored: Thu Feb 22 21:05:23 2018 +0800
Committer: Jeff Zhang <zjf...@apache.org>
Committed: Tue Mar 6 09:57:18 2018 +0800

----------------------------------------------------------------------
 .travis.yml                                     |   30 +-
 .../zeppelin/spark/SparkRInterpreter.java       |    8 +-
 .../interpreter/InterpreterProperty.java        |    1 +
 zeppelin-server/notebook/.python.recovery.crc   |  Bin 12 -> 0 bytes
 zeppelin-server/notebook/python.recovery        |    1 -
 zeppelin-server/pom.xml                         |    9 +
 .../zeppelin/rest/AbstractTestRestApi.java      |  211 +---
 .../zeppelin/rest/ZeppelinSparkClusterTest.java | 1140 +++++++++---------
 .../src/test/resources/log4j.properties         |    3 +-
 zeppelin-zengine/pom.xml                        |   14 +
 .../interpreter/SparkDownloadUtils.java         |   91 ++
 .../interpreter/SparkIntegrationTest.java       |  190 +++
 .../interpreter/SparkInterpreterModeTest.java   |  163 ---
 .../src/test/resources/log4j.properties         |    6 +-
 14 files changed, 984 insertions(+), 883 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index bc568fc..aba2944 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -66,7 +66,7 @@ matrix:
     - sudo: required
       jdk: "oraclejdk8"
       dist: trusty
-      env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" 
SPARKR="true" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples 
-Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify 
-Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" 
TEST_PROJECTS="-Dtests.to.exclude=**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java
 -DfailIfNoTests=false"
+      env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" 
SPARKR="true" PROFILE="-Pspark-2.2 -Pscalding -Phelium-dev -Pexamples 
-Pscala-2.11" BUILD_FLAG="package -Pbuild-distr -DskipRat" TEST_FLAG="verify 
-Pusing-packaged-distr -DskipRat" MODULES="-pl ${INTERPRETERS}" 
TEST_PROJECTS="-Dtests.to.exclude=**/SparkIntegrationTest.java,**/ZeppelinSparkClusterTest.java,**/org/apache/zeppelin/spark/*,**/HeliumApplicationFactoryTest.java
 -DfailIfNoTests=false"
 
     # Test selenium with spark module for 1.6.3
     - jdk: "oraclejdk8"
@@ -78,32 +78,46 @@ matrix:
     # Test interpreter modules
     - jdk: "openjdk7"
       dist: trusty
-      env: PYTHON="3" SCALA_VER="2.10" PROFILE="-Pscalding" 
BUILD_FLAG="install -DskipTests -DskipRat -Pr" TEST_FLAG="test -DskipRat" 
MODULES="-pl $(echo .,zeppelin-interpreter,${INTERPRETERS} | sed 's/!//g')" 
TEST_PROJECTS=""
+      env: PYTHON="3" SCALA_VER="2.10" PROFILE="-Pscalding" 
BUILD_FLAG="package -DskipTests -DskipRat -Pr" TEST_FLAG="test -DskipRat" 
MODULES="-pl $(echo .,zeppelin-interpreter,${INTERPRETERS} | sed 's/!//g')" 
TEST_PROJECTS=""
+
+    # Run ZeppelinSparkClusterTest & SparkIntegrationTest in one build would 
exceed the time limitation of travis, so running them separately
+    
+    # Test spark interpreter with different spark versions under python2, only 
run ZeppelinSparkClusterTest
+    - sudo: required
+      jdk: "oraclejdk8"
+      dist: trusty
+      env: PYTHON="2" PROFILE="-Pspark-2.2" SPARKR="true" BUILD_FLAG="package 
-DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl 
zeppelin-zengine,zeppelin-server,spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest -DfailIfNoTests=false"
+
+    # Test spark interpreter with different spark versions under python3, only 
run SparkIntegrationTest
+    - sudo: required
+      jdk: "oraclejdk8"
+      dist: trusty
+      env: PYTHON="3" PROFILE="-Pspark-2.2" SPARKR="true" BUILD_FLAG="package 
-DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl 
zeppelin-zengine,spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=SparkIntegrationTest -DfailIfNoTests=false"
 
     # Test spark module for 2.2.0 with scala 2.11
     - jdk: "oraclejdk8"
       dist: trusty
-      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" 
PROFILE="-Pspark-2.2 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* 
-DfailIfNoTests=false"
+      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.2.0" HADOOP_VER="2.6" 
PROFILE="-Pspark-2.2 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
     # Test spark module for 2.1.0 with scala 2.11
     - jdk: "openjdk7"
       dist: trusty
-      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" 
PROFILE="-Pspark-2.1 -Phadoop2 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* 
-DfailIfNoTests=false"
+      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.1.0" HADOOP_VER="2.6" 
PROFILE="-Pspark-2.1 -Phadoop2 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
     # Test spark module for 2.0.2 with scala 2.11
     - jdk: "oraclejdk8"
       dist: trusty
-      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" 
PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* 
-DfailIfNoTests=false"
+      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="2.0.2" HADOOP_VER="2.6" 
PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
     # Test spark module for 1.6.3 with scala 2.10
     - jdk: "openjdk7"
       dist: trusty
-      env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" 
BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* 
-DfailIfNoTests=false"
+      env: PYTHON="3" SCALA_VER="2.10" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
PROFILE="-Pspark-1.6 -Phadoop2 -Phadoop-2.6 -Pscala-2.10" SPARKR="true" 
BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
     # Test spark module for 1.6.3 with scala 2.11
     - jdk: "oraclejdk8"
       dist: trusty
-      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
PROFILE="-Pspark-1.6 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl zeppelin-server,spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest,org.apache.zeppelin.spark.* 
-DfailIfNoTests=false"
+      env: PYTHON="2" SCALA_VER="2.11" SPARK_VER="1.6.3" HADOOP_VER="2.6" 
PROFILE="-Pspark-1.6 -Phadoop3 -Phadoop-2.6 -Pscala-2.11" SPARKR="true" 
BUILD_FLAG="package -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" 
MODULES="-pl spark/interpreter,spark/spark-dependencies" 
TEST_PROJECTS="-Dtest=org.apache.zeppelin.spark.* -DfailIfNoTests=false"
 
     # Test python/pyspark with python 2, livy 0.5
     - sudo: required
@@ -116,7 +130,7 @@ matrix:
       dist: trusty
       jdk: "openjdk7"
       env: PYTHON="3" SCALA_VER="2.11" SPARK_VER="2.0.0" HADOOP_VER="2.6" 
LIVY_VER="0.5.0-incubating" PROFILE="-Pspark-2.0 -Phadoop3 -Phadoop-2.6 
-Pscala-2.11" BUILD_FLAG="install -am -DskipTests -DskipRat" TEST_FLAG="verify 
-DskipRat" MODULES="-pl livy" TEST_PROJECTS="-Dpyspark.test.exclude='' 
-DfailIfNoTests=false"
-      
+
 before_install:
   # check files included in commit range, clear bower_components if a 
bower.json file has changed.
   # bower cache clearing can also be forced by putting "bower clear" or "clear 
bower" in a commit message

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
----------------------------------------------------------------------
diff --git 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
index dbaeabe..5efff0e 100644
--- 
a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
+++ 
b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java
@@ -59,12 +59,18 @@ public class SparkRInterpreter extends Interpreter {
     String sparkRLibPath;
 
     if (System.getenv("SPARK_HOME") != null) {
+      // local or yarn-client mode when SPARK_HOME is specified
       sparkRLibPath = System.getenv("SPARK_HOME") + "/R/lib";
-    } else {
+    } else if (System.getenv("ZEPPELIN_HOME") != null){
+      // embedded mode when SPARK_HOME is not specified
       sparkRLibPath = System.getenv("ZEPPELIN_HOME") + 
"/interpreter/spark/R/lib";
       // workaround to make sparkr work without SPARK_HOME
       System.setProperty("spark.test.home", System.getenv("ZEPPELIN_HOME") + 
"/interpreter/spark");
+    } else {
+      // yarn-cluster mode
+      sparkRLibPath = "sparkr";
     }
+
     synchronized (SparkRBackend.backend()) {
       if (!SparkRBackend.isStarted()) {
         SparkRBackend.init();

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
index 0bb3d42..92cf3a8 100644
--- 
a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
+++ 
b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterProperty.java
@@ -34,6 +34,7 @@ public class InterpreterProperty {
   public InterpreterProperty(String name, Object value) {
     this.name = name;
     this.value = value;
+    this.type = InterpreterPropertyType.TEXTAREA.getValue();
   }
 
   public String getName() {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/notebook/.python.recovery.crc
----------------------------------------------------------------------
diff --git a/zeppelin-server/notebook/.python.recovery.crc 
b/zeppelin-server/notebook/.python.recovery.crc
deleted file mode 100644
index 6bd3e7a..0000000
Binary files a/zeppelin-server/notebook/.python.recovery.crc and /dev/null 
differ

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/notebook/python.recovery
----------------------------------------------------------------------
diff --git a/zeppelin-server/notebook/python.recovery 
b/zeppelin-server/notebook/python.recovery
deleted file mode 100644
index eaf4938..0000000
--- a/zeppelin-server/notebook/python.recovery
+++ /dev/null
@@ -1 +0,0 @@
-2CZA1DVUG:shared_process       192.168.3.2:55410
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-server/pom.xml b/zeppelin-server/pom.xml
index 970f302..1622791 100644
--- a/zeppelin-server/pom.xml
+++ b/zeppelin-server/pom.xml
@@ -341,6 +341,15 @@
           </exclusion>
         </exclusions>
       </dependency>
+
+    <dependency>
+      <groupId>org.apache.zeppelin</groupId>
+      <artifactId>zeppelin-zengine</artifactId>
+      <version>${project.version}</version>
+      <classifier>tests</classifier>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
   <build>
     <plugins>

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
index 5193420..dfb5ac2 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/AbstractTestRestApi.java
@@ -63,72 +63,70 @@ public abstract class AbstractTestRestApi {
   static final String restApiUrl = "/api";
   static final String url = getUrlToTest();
   protected static final boolean wasRunning = checkIfServerIsRunning();
-  static boolean pySpark = false;
-  static boolean sparkR = false;
   static boolean isRunningWithAuth = false;
 
   private static File shiroIni = null;
   private static String zeppelinShiro =
       "[users]\n" +
-      "admin = password1, admin\n" +
-      "user1 = password2, role1, role2\n" +
-      "user2 = password3, role3\n" +
-      "user3 = password4, role2\n" +
-      "[main]\n" +
-      "sessionManager = 
org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" +
-      "securityManager.sessionManager = $sessionManager\n" +
-      "securityManager.sessionManager.globalSessionTimeout = 86400000\n" +
-      "shiro.loginUrl = /api/login\n" +
-      "[roles]\n" +
-      "role1 = *\n" +
-      "role2 = *\n" +
-      "role3 = *\n" +
-      "admin = *\n" +
-      "[urls]\n" +
-      "/api/version = anon\n" +
-      "/** = authc";
+          "admin = password1, admin\n" +
+          "user1 = password2, role1, role2\n" +
+          "user2 = password3, role3\n" +
+          "user3 = password4, role2\n" +
+          "[main]\n" +
+          "sessionManager = 
org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" +
+          "securityManager.sessionManager = $sessionManager\n" +
+          "securityManager.sessionManager.globalSessionTimeout = 86400000\n" +
+          "shiro.loginUrl = /api/login\n" +
+          "[roles]\n" +
+          "role1 = *\n" +
+          "role2 = *\n" +
+          "role3 = *\n" +
+          "admin = *\n" +
+          "[urls]\n" +
+          "/api/version = anon\n" +
+          "/** = authc";
 
   private static String zeppelinShiroKnox =
       "[users]\n" +
-      "admin = password1, admin\n" +
-      "user1 = password2, role1, role2\n" +
-      "[main]\n" +
-      "knoxJwtRealm = org.apache.zeppelin.realm.jwt.KnoxJwtRealm\n" +
-      "knoxJwtRealm.providerUrl = https://domain.example.com/\n"; +
-      "knoxJwtRealm.login = gateway/knoxsso/knoxauth/login.html\n" +
-      "knoxJwtRealm.logout = gateway/knoxssout/api/v1/webssout\n" +
-      "knoxJwtRealm.redirectParam = originalUrl\n" +
-      "knoxJwtRealm.cookieName = hadoop-jwt\n" +
-      "knoxJwtRealm.publicKeyPath = knox-sso.pem\n" +
-      "authc = org.apache.zeppelin.realm.jwt.KnoxAuthenticationFilter\n" +
-      "sessionManager = 
org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" +
-      "securityManager.sessionManager = $sessionManager\n" +
-      "securityManager.sessionManager.globalSessionTimeout = 86400000\n" +
-      "shiro.loginUrl = /api/login\n" +
-      "[roles]\n" +
-      "admin = *\n" +
-      "[urls]\n" +
-      "/api/version = anon\n" +
-      "/** = authc";
+          "admin = password1, admin\n" +
+          "user1 = password2, role1, role2\n" +
+          "[main]\n" +
+          "knoxJwtRealm = org.apache.zeppelin.realm.jwt.KnoxJwtRealm\n" +
+          "knoxJwtRealm.providerUrl = https://domain.example.com/\n"; +
+          "knoxJwtRealm.login = gateway/knoxsso/knoxauth/login.html\n" +
+          "knoxJwtRealm.logout = gateway/knoxssout/api/v1/webssout\n" +
+          "knoxJwtRealm.redirectParam = originalUrl\n" +
+          "knoxJwtRealm.cookieName = hadoop-jwt\n" +
+          "knoxJwtRealm.publicKeyPath = knox-sso.pem\n" +
+          "authc = org.apache.zeppelin.realm.jwt.KnoxAuthenticationFilter\n" +
+          "sessionManager = 
org.apache.shiro.web.session.mgt.DefaultWebSessionManager\n" +
+          "securityManager.sessionManager = $sessionManager\n" +
+          "securityManager.sessionManager.globalSessionTimeout = 86400000\n" +
+          "shiro.loginUrl = /api/login\n" +
+          "[roles]\n" +
+          "admin = *\n" +
+          "[urls]\n" +
+          "/api/version = anon\n" +
+          "/** = authc";
 
   private static File knoxSsoPem = null;
   private static String KNOX_SSO_PEM =
       "-----BEGIN CERTIFICATE-----\n"
-      + "MIIChjCCAe+gAwIBAgIJALYrdDEXKwcqMA0GCSqGSIb3DQEBBQUAMIGEMQswCQYD\n"
-      + "VQQGEwJVUzENMAsGA1UECBMEVGVzdDENMAsGA1UEBxMEVGVzdDEPMA0GA1UEChMG\n"
-      + "SGFkb29wMQ0wCwYDVQQLEwRUZXN0MTcwNQYDVQQDEy5jdHItZTEzNS0xNTEyMDY5\n"
-      + "MDMyOTc1LTU0NDctMDEtMDAwMDAyLmh3eC5zaXRlMB4XDTE3MTIwNDA5NTIwMFoX\n"
-      + "DTE4MTIwNDA5NTIwMFowgYQxCzAJBgNVBAYTAlVTMQ0wCwYDVQQIEwRUZXN0MQ0w\n"
-      + "CwYDVQQHEwRUZXN0MQ8wDQYDVQQKEwZIYWRvb3AxDTALBgNVBAsTBFRlc3QxNzA1\n"
-      + "BgNVBAMTLmN0ci1lMTM1LTE1MTIwNjkwMzI5NzUtNTQ0Ny0wMS0wMDAwMDIuaHd4\n"
-      + "LnNpdGUwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAILFoXdz3yCy2INncYM2\n"
-      + "y72fYrONoQIxeeIzeJIibXLTuowSju90Q6aThSyUsQ6NEia2flnlKiCgINTNAodh\n"
-      + "UPUVGyGT+NMrqJzzpXAll2UUa6gIUPnXYEzYNkMIpbQOAo5BAg7YamaidbPPiT3W\n"
-      + "wAD1rWo3AMUY+nZJrAi4dEH5AgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAB0R07/lo\n"
-      + "4hD+WeDEeyLTnsbFnPNXxBT1APMUmmuCjcky/19ZB8OphqTKIITONdOK/XHdjZHG\n"
-      + "JDOfhBkVknL42lSi45ahUAPS2PZOlQL08MbS8xajP1faterm+aHcdwJVK9dK76RB\n"
-      + "/bA8TFNPblPxavIOcd+R+RfFmT1YKfYIhco=\n"
-      + "-----END CERTIFICATE-----";
+          + 
"MIIChjCCAe+gAwIBAgIJALYrdDEXKwcqMA0GCSqGSIb3DQEBBQUAMIGEMQswCQYD\n"
+          + 
"VQQGEwJVUzENMAsGA1UECBMEVGVzdDENMAsGA1UEBxMEVGVzdDEPMA0GA1UEChMG\n"
+          + 
"SGFkb29wMQ0wCwYDVQQLEwRUZXN0MTcwNQYDVQQDEy5jdHItZTEzNS0xNTEyMDY5\n"
+          + 
"MDMyOTc1LTU0NDctMDEtMDAwMDAyLmh3eC5zaXRlMB4XDTE3MTIwNDA5NTIwMFoX\n"
+          + 
"DTE4MTIwNDA5NTIwMFowgYQxCzAJBgNVBAYTAlVTMQ0wCwYDVQQIEwRUZXN0MQ0w\n"
+          + 
"CwYDVQQHEwRUZXN0MQ8wDQYDVQQKEwZIYWRvb3AxDTALBgNVBAsTBFRlc3QxNzA1\n"
+          + 
"BgNVBAMTLmN0ci1lMTM1LTE1MTIwNjkwMzI5NzUtNTQ0Ny0wMS0wMDAwMDIuaHd4\n"
+          + 
"LnNpdGUwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAILFoXdz3yCy2INncYM2\n"
+          + 
"y72fYrONoQIxeeIzeJIibXLTuowSju90Q6aThSyUsQ6NEia2flnlKiCgINTNAodh\n"
+          + 
"UPUVGyGT+NMrqJzzpXAll2UUa6gIUPnXYEzYNkMIpbQOAo5BAg7YamaidbPPiT3W\n"
+          + 
"wAD1rWo3AMUY+nZJrAi4dEH5AgMBAAEwDQYJKoZIhvcNAQEFBQADgYEAB0R07/lo\n"
+          + 
"4hD+WeDEeyLTnsbFnPNXxBT1APMUmmuCjcky/19ZB8OphqTKIITONdOK/XHdjZHG\n"
+          + 
"JDOfhBkVknL42lSi45ahUAPS2PZOlQL08MbS8xajP1faterm+aHcdwJVK9dK76RB\n"
+          + "/bA8TFNPblPxavIOcd+R+RfFmT1YKfYIhco=\n"
+          + "-----END CERTIFICATE-----";
 
   protected static File zeppelinHome;
   protected static File confDir;
@@ -159,7 +157,7 @@ public abstract class AbstractTestRestApi {
     @Override
     public void run() {
       try {
-        ZeppelinServer.main(new String[] {""});
+        ZeppelinServer.main(new String[]{""});
       } catch (Exception e) {
         LOG.error("Exception in WebDriverManager while getWebDriver ", e);
         throw new RuntimeException(e);
@@ -190,7 +188,7 @@ public abstract class AbstractTestRestApi {
         isRunningWithAuth = true;
         // Set Anonymous session to false.
         
System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_ANONYMOUS_ALLOWED.getVarName(),
 "false");
-        
+
         // Create a shiro env test.
         shiroIni = new File(confDir, "shiro.ini");
         if (!shiroIni.exists()) {
@@ -246,66 +244,10 @@ public abstract class AbstractTestRestApi {
         throw new RuntimeException("Can not start Zeppelin server");
       }
       LOG.info("Test Zeppelin stared.");
-
-
-      // assume first one is spark
-      InterpreterSetting sparkIntpSetting = null;
-      for(InterpreterSetting intpSetting :
-          ZeppelinServer.notebook.getInterpreterSettingManager().get()) {
-        if (intpSetting.getName().equals("spark")) {
-          sparkIntpSetting = intpSetting;
-        }
-      }
-
-      Map<String, InterpreterProperty> sparkProperties =
-          (Map<String, InterpreterProperty>) sparkIntpSetting.getProperties();
-      // ci environment runs spark cluster for testing
-      // so configure zeppelin use spark cluster
-      if ("true".equals(System.getenv("CI"))) {
-        // set spark master and other properties
-        sparkProperties.put("master",
-            new InterpreterProperty("master", "local[2]", 
InterpreterPropertyType.TEXTAREA.getValue()));
-        sparkProperties.put("spark.master",
-            new InterpreterProperty("spark.master", "local[2]", 
InterpreterPropertyType.TEXTAREA.getValue()));
-        sparkProperties.put("spark.cores.max",
-            new InterpreterProperty("spark.cores.max", "2", 
InterpreterPropertyType.TEXTAREA.getValue()));
-        sparkProperties.put("zeppelin.spark.useHiveContext",
-            new InterpreterProperty("zeppelin.spark.useHiveContext", false, 
InterpreterPropertyType.CHECKBOX.getValue()));
-        sparkProperties.put("zeppelin.pyspark.useIPython",  new 
InterpreterProperty("zeppelin.pyspark.useIPython", "false", 
InterpreterPropertyType.TEXTAREA.getValue()));
-        sparkProperties.put("zeppelin.spark.test", new 
InterpreterProperty("zeppelin.spark.test", "true", 
InterpreterPropertyType.TEXTAREA.getValue()));
-        sparkIntpSetting.setProperties(sparkProperties);
-        pySpark = true;
-        sparkR = true;
-        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId());
-      } else {
-        String sparkHome = getSparkHome();
-        LOG.info("SPARK HOME detected " + sparkHome);
-        if (sparkHome != null) {
-          if (System.getenv("SPARK_MASTER") != null) {
-            sparkProperties.put("master",
-                new InterpreterProperty("master", 
System.getenv("SPARK_MASTER"), InterpreterPropertyType.TEXTAREA.getValue()));
-          } else {
-            sparkProperties.put("master",
-                new InterpreterProperty("master", "local[2]", 
InterpreterPropertyType.TEXTAREA.getValue()));
-          }
-          sparkProperties.put("spark.master",
-              new InterpreterProperty("spark.master", "local[2]", 
InterpreterPropertyType.TEXTAREA.getValue()));
-          sparkProperties.put("spark.cores.max",
-              new InterpreterProperty("spark.cores.max", "2", 
InterpreterPropertyType.TEXTAREA.getValue()));
-          sparkProperties.put("zeppelin.spark.useHiveContext",
-              new InterpreterProperty("zeppelin.spark.useHiveContext", false, 
InterpreterPropertyType.CHECKBOX.getValue()));
-          sparkProperties.put("zeppelin.pyspark.useIPython",  new 
InterpreterProperty("zeppelin.pyspark.useIPython", "false", 
InterpreterPropertyType.TEXTAREA.getValue()));
-          sparkProperties.put("zeppelin.spark.test", new 
InterpreterProperty("zeppelin.spark.test", "true", 
InterpreterPropertyType.TEXTAREA.getValue()));
-
-          pySpark = true;
-          sparkR = true;
-        }
-
-        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId());
-      }
     }
   }
 
+
   protected static void startUpWithKnoxEnable(String testClassName) throws 
Exception {
     start(true, testClassName, true);
   }
@@ -327,53 +269,12 @@ public abstract class AbstractTestRestApi {
     }
   }
 
-  private static String getSparkHome() {
-    String sparkHome = System.getenv("SPARK_HOME");
-    if (sparkHome != null) {
-      return sparkHome;
-    }
-    sparkHome = getSparkHomeRecursively(new 
File(System.getProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName())));
-    return sparkHome;
-  }
-
-  boolean isPyspark() {
-    return pySpark;
-  }
-
-  boolean isSparkR() {
-    return sparkR;
-  }
-
-  private static String getSparkHomeRecursively(File dir) {
-    if (dir == null) return null;
-    File files []  = dir.listFiles();
-    if (files == null) return null;
-
-    File homeDetected = null;
-    for (File f : files) {
-      if (isActiveSparkHome(f)) {
-        homeDetected = f;
-        break;
-      }
-    }
-
-    if (homeDetected != null) {
-      return homeDetected.getAbsolutePath();
-    } else {
-      return getSparkHomeRecursively(dir.getParentFile());
-    }
-  }
-
-  private static boolean isActiveSparkHome(File dir) {
-    return 
dir.getName().matches("spark-[0-9\\.]+[A-Za-z-]*-bin-hadoop[0-9\\.]+");
-  }
-
   protected static void shutDown() throws Exception {
     shutDown(true);
   }
 
   protected static void shutDown(final boolean deleteConfDir) throws Exception 
{
-    if (!wasRunning) {
+    if (!wasRunning && ZeppelinServer.notebook != null) {
       // restart interpreter to stop all interpreter processes
       List<InterpreterSetting> settingList = 
ZeppelinServer.notebook.getInterpreterSettingManager().get();
       if (!ZeppelinServer.notebook.getConf().isRecoveryEnabled()) {

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
index f3a7099..bda555a 100644
--- 
a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
+++ 
b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java
@@ -16,588 +16,630 @@
  */
 package org.apache.zeppelin.rest;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterProperty;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
+import org.apache.zeppelin.interpreter.SparkDownloadUtils;
 import org.apache.zeppelin.notebook.Note;
 import org.apache.zeppelin.notebook.Paragraph;
 import org.apache.zeppelin.scheduler.Job.Status;
 import org.apache.zeppelin.server.ZeppelinServer;
 import org.apache.zeppelin.user.AuthenticationInfo;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test against spark cluster.
- * Spark cluster is started by CI server using testing/startSparkCluster.sh
  */
+@RunWith(value = Parameterized.class)
 public class ZeppelinSparkClusterTest extends AbstractTestRestApi {
-    AuthenticationInfo anonymous;
-
-    @BeforeClass
-    public static void init() throws Exception {
-        
AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName());
-    }
-
-    @AfterClass
-    public static void destroy() throws Exception {
-        AbstractTestRestApi.shutDown();
-    }
-
-    @Before
-    public void setUp() {
-        anonymous = new AuthenticationInfo("anonymous");
+  private static Logger LOGGER = 
LoggerFactory.getLogger(ZeppelinSparkClusterTest.class);
+
+  private String sparkVersion;
+  private AuthenticationInfo anonymous = new AuthenticationInfo("anonymous");
+
+  public ZeppelinSparkClusterTest(String sparkVersion) throws Exception {
+    this.sparkVersion = sparkVersion;
+    LOGGER.info("Testing SparkVersion: " + sparkVersion);
+    String sparkHome = SparkDownloadUtils.downloadSpark(sparkVersion);
+    setupSparkInterpreter(sparkHome);
+    verifySparkVersionNumber();
+  }
+
+  @Parameterized.Parameters
+  public static List<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {"2.2.1"},
+        {"2.1.2"},
+        {"2.0.2"},
+        {"1.6.3"}
+    });
+  }
+
+  public void setupSparkInterpreter(String sparkHome) throws 
InterpreterException {
+    InterpreterSetting sparkIntpSetting = 
ZeppelinServer.notebook.getInterpreterSettingManager()
+        .getInterpreterSettingByName("spark");
+
+    Map<String, InterpreterProperty> sparkProperties =
+        (Map<String, InterpreterProperty>) sparkIntpSetting.getProperties();
+    LOG.info("SPARK HOME detected " + sparkHome);
+    if (System.getenv("SPARK_MASTER") != null) {
+      sparkProperties.put("master",
+          new InterpreterProperty("master", System.getenv("SPARK_MASTER")));
+    } else {
+      sparkProperties.put("master", new InterpreterProperty("master", 
"local[2]"));
     }
-
-    private void waitForFinish(Paragraph p) {
-        while (p.getStatus() != Status.FINISHED
-                && p.getStatus() != Status.ERROR
-                && p.getStatus() != Status.ABORT) {
-            try {
-                Thread.sleep(100);
-            } catch (InterruptedException e) {
-                LOG.error("Exception in WebDriverManager while getWebDriver ", 
e);
-            }
-        }
-    }
-
-    @Test
-    public void scalaOutputTest() throws IOException {
-        // create new note
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config = p.getConfig();
-        config.put("enabled", true);
-        p.setConfig(config);
-        p.setText("%spark import java.util.Date\n" +
-            "import java.net.URL\n" +
-            "println(\"hello\")\n"
-        );
-        p.setAuthenticationInfo(anonymous);
-        note.run(p.getId());
-        waitForFinish(p);
-        assertEquals(Status.FINISHED, p.getStatus());
-        assertEquals("import java.util.Date\n" +
-            "import java.net.URL\n" +
-            "hello\n", p.getResult().message().get(0).getData());
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
-    }
-
-
-
-    @Test
-    public void basicRDDTransformationAndActionTest() throws IOException {
-        // create new note
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-
-        // run markdown paragraph, again
-        Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config = p.getConfig();
-        config.put("enabled", true);
-        p.setConfig(config);
-        p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))");
-        p.setAuthenticationInfo(anonymous);
-        note.run(p.getId());
-        waitForFinish(p);
-        assertEquals(Status.FINISHED, p.getStatus());
-        assertEquals("55", p.getResult().message().get(0).getData());
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+    sparkProperties.put("SPARK_HOME", new InterpreterProperty("SPARK_HOME", 
sparkHome));
+    sparkProperties.put("spark.master", new 
InterpreterProperty("spark.master", "local[2]"));
+    sparkProperties.put("spark.cores.max",
+        new InterpreterProperty("spark.cores.max", "2"));
+    sparkProperties.put("zeppelin.spark.useHiveContext",
+        new InterpreterProperty("zeppelin.spark.useHiveContext", "false"));
+    sparkProperties.put("zeppelin.pyspark.useIPython", new 
InterpreterProperty("zeppelin.pyspark.useIPython", "false"));
+    sparkProperties.put("zeppelin.spark.useNew", new 
InterpreterProperty("zeppelin.spark.useNew", "true"));
+    sparkProperties.put("zeppelin.spark.test", new 
InterpreterProperty("zeppelin.spark.test", "true"));
+
+    
ZeppelinServer.notebook.getInterpreterSettingManager().restart(sparkIntpSetting.getId());
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
AbstractTestRestApi.startUp(ZeppelinSparkClusterTest.class.getSimpleName());
+  }
+
+  @AfterClass
+  public static void destroy() throws Exception {
+    AbstractTestRestApi.shutDown();
+  }
+
+  private void waitForFinish(Paragraph p) {
+    while (p.getStatus() != Status.FINISHED
+        && p.getStatus() != Status.ERROR
+        && p.getStatus() != Status.ABORT) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        LOG.error("Exception in WebDriverManager while getWebDriver ", e);
+      }
     }
-
-    @Test
-    public void sparkSQLTest() throws IOException {
-        // create new note
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        int sparkVersion = getSparkVersionNumber(note);
-        // DataFrame API is available from spark 1.3
-        if (sparkVersion >= 13) {
-            // test basic dataframe api
-            Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-            Map config = p.getConfig();
-            config.put("enabled", true);
-            p.setConfig(config);
-            p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
-                    "df.collect()");
-            p.setAuthenticationInfo(anonymous);
-            note.run(p.getId());
-            waitForFinish(p);
-            assertEquals(Status.FINISHED, p.getStatus());
-            assertTrue(p.getResult().message().get(0).getData().contains(
-                    "Array[org.apache.spark.sql.Row] = Array([hello,20])"));
-
-            // test display DataFrame
-            p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-            config = p.getConfig();
-            config.put("enabled", true);
-            p.setConfig(config);
-            p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
-                    "z.show(df)");
-            p.setAuthenticationInfo(anonymous);
-            note.run(p.getId());
-            waitForFinish(p);
-            assertEquals(Status.FINISHED, p.getStatus());
-            assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(1).getType());
-            assertEquals("_1\t_2\nhello\t20\n", 
p.getResult().message().get(1).getData());
-
-            // test display DataSet
-            if (sparkVersion >= 20) {
-                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-                config = p.getConfig();
-                config.put("enabled", true);
-                p.setConfig(config);
-                p.setText("%spark val 
ds=spark.createDataset(Seq((\"hello\",20)))\n" +
-                        "z.show(ds)");
-                p.setAuthenticationInfo(anonymous);
-                note.run(p.getId());
-                waitForFinish(p);
-                assertEquals(Status.FINISHED, p.getStatus());
-                assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(1).getType());
-                assertEquals("_1\t_2\nhello\t20\n", 
p.getResult().message().get(1).getData());
-            }
-        }
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void scalaOutputTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%spark import java.util.Date\n" +
+        "import java.net.URL\n" +
+        "println(\"hello\")\n"
+    );
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertEquals("import java.util.Date\n" +
+        "import java.net.URL\n" +
+        "hello\n", p.getResult().message().get(0).getData());
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+
+  @Test
+  public void basicRDDTransformationAndActionTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+
+    // run markdown paragraph, again
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%spark print(sc.parallelize(1 to 10).reduce(_ + _))");
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertEquals("55", p.getResult().message().get(0).getData());
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void sparkSQLTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    // test basic dataframe api
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
+        "df.collect()");
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertTrue(p.getResult().message().get(0).getData().contains(
+        "Array[org.apache.spark.sql.Row] = Array([hello,20])"));
+
+    // test display DataFrame
+    p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%spark val 
df=sqlContext.createDataFrame(Seq((\"hello\",20)))\n" +
+        "z.show(df)");
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(1).getType());
+    assertEquals("_1\t_2\nhello\t20\n", 
p.getResult().message().get(1).getData());
+
+    // test display DataSet
+    if (isSpark2()) {
+      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      config = p.getConfig();
+      config.put("enabled", true);
+      p.setConfig(config);
+      p.setText("%spark val ds=spark.createDataset(Seq((\"hello\",20)))\n" +
+          "z.show(ds)");
+      p.setAuthenticationInfo(anonymous);
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(1).getType());
+      assertEquals("_1\t_2\nhello\t20\n", 
p.getResult().message().get(1).getData());
     }
 
-    @Test
-    public void sparkRTest() throws IOException, InterpreterException {
-      // create new note
-      Note note = ZeppelinServer.notebook.createNote(anonymous);
-      int sparkVersion = getSparkVersionNumber(note);
-
-      if (isSparkR() && sparkVersion >= 14) {   // sparkr supported from 1.4.0
-        // restart spark interpreter
-        List<InterpreterSetting> settings =
-          ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId());
-
-        for (InterpreterSetting setting : settings) {
-          if (setting.getName().equals("spark")) {
-            
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
-            break;
-          }
-        }
-
-        String sqlContextName = "sqlContext";
-        if (sparkVersion >= 20) {
-          sqlContextName = "spark";
-        }
-        Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config = p.getConfig();
-        config.put("enabled", true);
-        p.setConfig(config);
-        p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), 
age=c(19, 23, 18))\n" +
-            "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
-            "count(df)"
-        );
-        p.setAuthenticationInfo(anonymous);
-        note.run(p.getId());
-        waitForFinish(p);
-        System.err.println("sparkRTest=" + 
p.getResult().message().get(0).getData());
-        assertEquals(Status.FINISHED, p.getStatus());
-        assertEquals("[1] 3", p.getResult().message().get(0).getData().trim());
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void sparkRTest() throws IOException, InterpreterException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    // restart spark interpreter
+    List<InterpreterSetting> settings =
+        ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId());
+
+    for (InterpreterSetting setting : settings) {
+      if (setting.getName().equals("spark")) {
+        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
+        break;
       }
-      ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
     }
 
-    @Test
-    public void pySparkTest() throws IOException {
-        // create new note
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        note.setName("note");
-        int sparkVersion = getSparkVersionNumber(note);
-
-        if (isPyspark() && sparkVersion >= 12) {   // pyspark supported from 
1.2.1
-            // run markdown paragraph, again
-            Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-            Map config = p.getConfig();
-            config.put("enabled", true);
-            p.setConfig(config);
-            p.setText("%pyspark print(sc.parallelize(range(1, 
11)).reduce(lambda a, b: a + b))");
-            p.setAuthenticationInfo(anonymous);
-            note.run(p.getId());
-            waitForFinish(p);
-            assertEquals(Status.FINISHED, p.getStatus());
-            assertEquals("55\n", p.getResult().message().get(0).getData());
-            if (sparkVersion >= 13) {
-                // run sqlContext test
-                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-                config = p.getConfig();
-                config.put("enabled", true);
-                p.setConfig(config);
-                p.setText("%pyspark from pyspark.sql import Row\n" +
-                        "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" 
+
-                        "df.collect()");
-                p.setAuthenticationInfo(anonymous);
-                note.run(p.getId());
-                waitForFinish(p);
-                assertEquals(Status.FINISHED, p.getStatus());
-                assertEquals("[Row(age=20, id=1)]\n", 
p.getResult().message().get(0).getData());
-
-                // test display Dataframe
-                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-                config = p.getConfig();
-                config.put("enabled", true);
-                p.setConfig(config);
-                p.setText("%pyspark from pyspark.sql import Row\n" +
-                        "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" 
+
-                        "z.show(df)");
-                p.setAuthenticationInfo(anonymous);
-                note.run(p.getId());
-                waitForFinish(p);
-                assertEquals(Status.FINISHED, p.getStatus());
-                assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(0).getType());
-                // TODO (zjffdu), one more \n is appended, need to investigate 
why.
-                assertEquals("age\tid\n20\t1\n", 
p.getResult().message().get(0).getData());
-
-                // test udf
-                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-                config = p.getConfig();
-                config.put("enabled", true);
-                p.setConfig(config);
-                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: 
len(x))\n" +
-                       "sqlContext.sql(\"select f1(\\\"abc\\\") as 
len\").collect()");
-                p.setAuthenticationInfo(anonymous);
-                note.run(p.getId());
-                waitForFinish(p);
-                assertEquals(Status.FINISHED, p.getStatus());
-                
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) 
||
-                    
"[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
-
-                // test exception
-                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-                config = p.getConfig();
-                config.put("enabled", true);
-                p.setConfig(config);
-                /**
-                 %pyspark
-                 a=1
-
-                 print(a2)
-                 */
-                p.setText("%pyspark a=1\n\nprint(a2)");
-                p.setAuthenticationInfo(anonymous);
-                note.run(p.getId());
-                waitForFinish(p);
-                assertEquals(Status.ERROR, p.getStatus());
-                assertTrue(p.getResult().message().get(0).getData()
-                    .contains("Fail to execute line 3: print(a2)"));
-                assertTrue(p.getResult().message().get(0).getData()
-                    .contains("name 'a2' is not defined"));
-            }
-            if (sparkVersion >= 20) {
-                // run SparkSession test
-                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-                config = p.getConfig();
-                config.put("enabled", true);
-                p.setConfig(config);
-                p.setText("%pyspark from pyspark.sql import Row\n" +
-                        "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" 
+
-                        "df.collect()");
-                p.setAuthenticationInfo(anonymous);
-                note.run(p.getId());
-                waitForFinish(p);
-                assertEquals(Status.FINISHED, p.getStatus());
-                assertEquals("[Row(age=20, id=1)]\n", 
p.getResult().message().get(0).getData());
-
-                // test udf
-                p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-                config = p.getConfig();
-                config.put("enabled", true);
-                p.setConfig(config);
-                // use SQLContext to register UDF but use this UDF through 
SparkSession
-                p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: 
len(x))\n" +
-                        "spark.sql(\"select f1(\\\"abc\\\") as 
len\").collect()");
-                p.setAuthenticationInfo(anonymous);
-                note.run(p.getId());
-                waitForFinish(p);
-                assertEquals(Status.FINISHED, p.getStatus());
-                
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) 
||
-                    
"[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
-            }
-        }
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+    String sqlContextName = "sqlContext";
+    if (isSpark2()) {
+      sqlContextName = "spark";
     }
-
-    @Test
-    public void pySparkAutoConvertOptionTest() throws IOException {
-        // create new note
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        note.setName("note");
-
-        int sparkVersionNumber = getSparkVersionNumber(note);
-
-        if (isPyspark() && sparkVersionNumber >= 14) {   // auto_convert 
enabled from spark 1.4
-            // run markdown paragraph, again
-            Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-            Map config = p.getConfig();
-            config.put("enabled", true);
-            p.setConfig(config);
-
-            String sqlContextName = "sqlContext";
-            if (sparkVersionNumber >= 20) {
-                sqlContextName = "spark";
-            }
-
-            p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
-                    + "print(" + sqlContextName + ".range(0, 
10).withColumn('uniform', rand(seed=10) * 3.14).count())");
-            p.setAuthenticationInfo(anonymous);
-            note.run(p.getId());
-            waitForFinish(p);
-            assertEquals(Status.FINISHED, p.getStatus());
-            assertEquals("10\n", p.getResult().message().get(0).getData());
-        }
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 
23, 18))\n" +
+        "df <- createDataFrame(" + sqlContextName + ", localDF)\n" +
+        "count(df)"
+    );
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    System.err.println("sparkRTest=" + 
p.getResult().message().get(0).getData());
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertEquals("[1] 3", p.getResult().message().get(0).getData().trim());
+
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void pySparkTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    note.setName("note");
+
+    // run markdown paragraph, again
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%pyspark print(sc.parallelize(range(1, 11)).reduce(lambda a, b: 
a + b))");
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertEquals("55\n", p.getResult().message().get(0).getData());
+    if (!isSpark2()) {
+      // run sqlContext test
+      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      config = p.getConfig();
+      config.put("enabled", true);
+      p.setConfig(config);
+      p.setText("%pyspark from pyspark.sql import Row\n" +
+          "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+          "df.collect()");
+      p.setAuthenticationInfo(anonymous);
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals("[Row(age=20, id=1)]\n", 
p.getResult().message().get(0).getData());
+
+      // test display Dataframe
+      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      config = p.getConfig();
+      config.put("enabled", true);
+      p.setConfig(config);
+      p.setText("%pyspark from pyspark.sql import Row\n" +
+          "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+          "z.show(df)");
+      p.setAuthenticationInfo(anonymous);
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals(InterpreterResult.Type.TABLE, 
p.getResult().message().get(0).getType());
+      // TODO (zjffdu), one more \n is appended, need to investigate why.
+      assertEquals("age\tid\n20\t1\n", 
p.getResult().message().get(0).getData());
+
+      // test udf
+      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      config = p.getConfig();
+      config.put("enabled", true);
+      p.setConfig(config);
+      p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" 
+
+          "sqlContext.sql(\"select f1(\\\"abc\\\") as len\").collect()");
+      p.setAuthenticationInfo(anonymous);
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.FINISHED, p.getStatus());
+      
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) 
||
+          "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
+
+      // test exception
+      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      config = p.getConfig();
+      config.put("enabled", true);
+      p.setConfig(config);
+      /**
+       %pyspark
+       a=1
+
+       print(a2)
+       */
+      p.setText("%pyspark a=1\n\nprint(a2)");
+      p.setAuthenticationInfo(anonymous);
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.ERROR, p.getStatus());
+      assertTrue(p.getResult().message().get(0).getData()
+          .contains("Fail to execute line 3: print(a2)"));
+      assertTrue(p.getResult().message().get(0).getData()
+          .contains("name 'a2' is not defined"));
+    } else {
+      // run SparkSession test
+      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      config = p.getConfig();
+      config.put("enabled", true);
+      p.setConfig(config);
+      p.setText("%pyspark from pyspark.sql import Row\n" +
+          "df=sqlContext.createDataFrame([Row(id=1, age=20)])\n" +
+          "df.collect()");
+      p.setAuthenticationInfo(anonymous);
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.FINISHED, p.getStatus());
+      assertEquals("[Row(age=20, id=1)]\n", 
p.getResult().message().get(0).getData());
+
+      // test udf
+      p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+      config = p.getConfig();
+      config.put("enabled", true);
+      p.setConfig(config);
+      // use SQLContext to register UDF but use this UDF through SparkSession
+      p.setText("%pyspark sqlContext.udf.register(\"f1\", lambda x: len(x))\n" 
+
+          "spark.sql(\"select f1(\\\"abc\\\") as len\").collect()");
+      p.setAuthenticationInfo(anonymous);
+      note.run(p.getId());
+      waitForFinish(p);
+      assertEquals(Status.FINISHED, p.getStatus());
+      
assertTrue("[Row(len=u'3')]\n".equals(p.getResult().message().get(0).getData()) 
||
+          "[Row(len='3')]\n".equals(p.getResult().message().get(0).getData()));
     }
 
-    @Test
-    public void zRunTest() throws IOException {
-        // create new note
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config0 = p0.getConfig();
-        config0.put("enabled", true);
-        p0.setConfig(config0);
-        p0.setText("%spark z.run(1)");
-        p0.setAuthenticationInfo(anonymous);
-        Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config1 = p1.getConfig();
-        config1.put("enabled", true);
-        p1.setConfig(config1);
-        p1.setText("%spark val a=10");
-        p1.setAuthenticationInfo(anonymous);
-        Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config2 = p2.getConfig();
-        config2.put("enabled", true);
-        p2.setConfig(config2);
-        p2.setText("%spark print(a)");
-        p2.setAuthenticationInfo(anonymous);
-
-        note.run(p0.getId());
-        waitForFinish(p0);
-        assertEquals(Status.FINISHED, p0.getStatus());
-
-        // z.run is not blocking call. So p1 may not be finished when p0 is 
done.
-        waitForFinish(p1);
-        note.run(p2.getId());
-        waitForFinish(p2);
-        assertEquals(Status.FINISHED, p2.getStatus());
-        assertEquals("10", p2.getResult().message().get(0).getData());
-
-        Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config3 = p3.getConfig();
-        config3.put("enabled", true);
-        p3.setConfig(config3);
-        p3.setText("%spark println(new java.util.Date())");
-        p3.setAuthenticationInfo(anonymous);
-
-        p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId()));
-        note.run(p0.getId());
-        waitForFinish(p0);
-        waitForFinish(p1);
-        waitForFinish(p2);
-        waitForFinish(p3);
-
-        assertEquals(Status.FINISHED, p3.getStatus());
-        String p3result = p3.getResult().message().get(0).getData();
-        assertNotEquals(null, p3result);
-        assertNotEquals("", p3result);
-
-        p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", 
note.getId(), p3.getId()));
-        p3.setText("%%spark println(\"END\")");
-
-        note.run(p0.getId());
-        waitForFinish(p0);
-        waitForFinish(p3);
-
-        assertNotEquals(p3result, p3.getResult().message());
-
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
-    }
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
 
-    @Test
-    public void pySparkDepLoaderTest() throws IOException, 
InterpreterException {
-        // create new note
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        int sparkVersionNumber = getSparkVersionNumber(note);
-
-        if (isPyspark() && sparkVersionNumber >= 14) {
-            // restart spark interpreter
-            List<InterpreterSetting> settings =
-                    
ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId());
-
-            for (InterpreterSetting setting : settings) {
-                if (setting.getName().equals("spark")) {
-                    
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
-                    break;
-                }
-            }
-
-            // load dep
-            Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-            Map config = p0.getConfig();
-            config.put("enabled", true);
-            p0.setConfig(config);
-            p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")");
-            p0.setAuthenticationInfo(anonymous);
-            note.run(p0.getId());
-            waitForFinish(p0);
-            assertEquals(Status.FINISHED, p0.getStatus());
-
-            // write test csv file
-            File tmpFile = File.createTempFile("test", "csv");
-            FileUtils.write(tmpFile, "a,b\n1,2");
-
-            // load data using libraries from dep loader
-            Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-            p1.setConfig(config);
-
-            String sqlContextName = "sqlContext";
-            if (sparkVersionNumber >= 20) {
-                sqlContextName = "spark";
-            }
-            p1.setText("%pyspark\n" +
-                    "from pyspark.sql import SQLContext\n" +
-                    "print(" + sqlContextName + 
".read.format('com.databricks.spark.csv')" +
-                    ".load('" + tmpFile.getAbsolutePath() +"').count())");
-            p1.setAuthenticationInfo(anonymous);
-            note.run(p1.getId());
-
-            waitForFinish(p1);
-            assertEquals(Status.FINISHED, p1.getStatus());
-            assertEquals("2\n", p1.getResult().message().get(0).getData());
-        }
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
-    }
+  @Test
+  public void pySparkAutoConvertOptionTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    note.setName("note");
 
-    /**
-     * Get spark version number as a numerical value.
-     * eg. 1.1.x => 11, 1.2.x => 12, 1.3.x => 13 ...
-     */
-    private int getSparkVersionNumber(Note note) {
-        Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        note.setName("note");
-        Map config = p.getConfig();
-        config.put("enabled", true);
-        p.setConfig(config);
-        p.setText("%spark print(sc.version)");
-        p.setAuthenticationInfo(anonymous);
-        note.run(p.getId());
-        waitForFinish(p);
-        assertEquals(Status.FINISHED, p.getStatus());
-        String sparkVersion = p.getResult().message().get(0).getData();
-        System.out.println("Spark version detected " + sparkVersion);
-        String[] split = sparkVersion.split("\\.");
-        int version = Integer.parseInt(split[0]) * 10 + 
Integer.parseInt(split[1]);
-        return version;
-    }
+    // run markdown paragraph, again
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
 
-    @Test
-    public void testSparkZeppelinContextDynamicForms() throws IOException {
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        note.setName("note");
-        Map config = p.getConfig();
-        config.put("enabled", true);
-        p.setConfig(config);
-        String code = "%spark.spark println(z.textbox(\"my_input\", 
\"default_name\"))\n" +
-            "println(z.select(\"my_select\", \"1\"," +
-            "Seq((\"1\", \"select_1\"), (\"2\", \"select_2\"))))\n" +
-            "val items=z.checkbox(\"my_checkbox\", Seq(\"2\"), " +
-            "Seq((\"1\", \"check_1\"), (\"2\", \"check_2\")))\n" +
-            "println(items(0))";
-        p.setText(code);
-        p.setAuthenticationInfo(anonymous);
-        note.run(p.getId());
-        waitForFinish(p);
-
-        assertEquals(Status.FINISHED, p.getStatus());
-        Iterator<String> formIter = p.settings.getForms().keySet().iterator();
-        assert(formIter.next().equals("my_input"));
-        assert(formIter.next().equals("my_select"));
-        assert(formIter.next().equals("my_checkbox"));
-
-        // check dynamic forms values
-        String[] result = p.getResult().message().get(0).getData().split("\n");
-        assertEquals(4, result.length);
-        assertEquals("default_name", result[0]);
-        assertEquals("1", result[1]);
-        assertEquals("items: Seq[Object] = Buffer(2)", result[2]);
-        assertEquals("2", result[3]);
-
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+    String sqlContextName = "sqlContext";
+    if (isSpark2()) {
+      sqlContextName = "spark";
     }
 
-    @Test
-    public void testPySparkZeppelinContextDynamicForms() throws IOException {
-        Note note = ZeppelinServer.notebook.createNote(anonymous);
-        Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        note.setName("note");
-        Map config = p.getConfig();
-        config.put("enabled", true);
-        p.setConfig(config);
-        String code = "%spark.pyspark print(z.input('my_input', 
'default_name'))\n" +
-            "print(z.select('my_select', " +
-            "[('1', 'select_1'), ('2', 'select_2')], defaultValue='1'))\n" +
-            "items=z.checkbox('my_checkbox', " +
-            "[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" +
-            "print(items[0])";
-        p.setText(code);
-        p.setAuthenticationInfo(anonymous);
-        note.run(p.getId());
-        waitForFinish(p);
-
-        assertEquals(Status.FINISHED, p.getStatus());
-        Iterator<String> formIter = p.settings.getForms().keySet().iterator();
-        assert(formIter.next().equals("my_input"));
-        assert(formIter.next().equals("my_select"));
-        assert(formIter.next().equals("my_checkbox"));
-
-        // check dynamic forms values
-        String[] result = p.getResult().message().get(0).getData().split("\n");
-        assertEquals(3, result.length);
-        assertEquals("default_name", result[0]);
-        assertEquals("1", result[1]);
-        assertEquals("2", result[2]);
-
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+    p.setText("%pyspark\nfrom pyspark.sql.functions import *\n"
+        + "print(" + sqlContextName + ".range(0, 10).withColumn('uniform', 
rand(seed=10) * 3.14).count())");
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertEquals("10\n", p.getResult().message().get(0).getData());
+
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void zRunTest() throws IOException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config0 = p0.getConfig();
+    config0.put("enabled", true);
+    p0.setConfig(config0);
+    p0.setText("%spark z.run(1)");
+    p0.setAuthenticationInfo(anonymous);
+    Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config1 = p1.getConfig();
+    config1.put("enabled", true);
+    p1.setConfig(config1);
+    p1.setText("%spark val a=10");
+    p1.setAuthenticationInfo(anonymous);
+    Paragraph p2 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config2 = p2.getConfig();
+    config2.put("enabled", true);
+    p2.setConfig(config2);
+    p2.setText("%spark print(a)");
+    p2.setAuthenticationInfo(anonymous);
+
+    note.run(p0.getId());
+    waitForFinish(p0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+
+    // z.run is not blocking call. So p1 may not be finished when p0 is done.
+    waitForFinish(p1);
+    note.run(p2.getId());
+    waitForFinish(p2);
+    assertEquals(Status.FINISHED, p2.getStatus());
+    assertEquals("10", p2.getResult().message().get(0).getData());
+
+    Paragraph p3 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config3 = p3.getConfig();
+    config3.put("enabled", true);
+    p3.setConfig(config3);
+    p3.setText("%spark println(new java.util.Date())");
+    p3.setAuthenticationInfo(anonymous);
+
+    p0.setText(String.format("%%spark z.runNote(\"%s\")", note.getId()));
+    note.run(p0.getId());
+    waitForFinish(p0);
+    waitForFinish(p1);
+    waitForFinish(p2);
+    waitForFinish(p3);
+
+    assertEquals(Status.FINISHED, p3.getStatus());
+    String p3result = p3.getResult().message().get(0).getData();
+    assertNotEquals(null, p3result);
+    assertNotEquals("", p3result);
+
+    p0.setText(String.format("%%spark z.run(\"%s\", \"%s\")", note.getId(), 
p3.getId()));
+    p3.setText("%%spark println(\"END\")");
+
+    note.run(p0.getId());
+    waitForFinish(p0);
+    waitForFinish(p3);
+
+    assertNotEquals(p3result, p3.getResult().message());
+
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void pySparkDepLoaderTest() throws IOException, InterpreterException {
+    // create new note
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+
+    // restart spark interpreter
+    List<InterpreterSetting> settings =
+        ZeppelinServer.notebook.getBindedInterpreterSettings(note.getId());
+
+    for (InterpreterSetting setting : settings) {
+      if (setting.getName().equals("spark")) {
+        
ZeppelinServer.notebook.getInterpreterSettingManager().restart(setting.getId());
+        break;
+      }
     }
 
-    @Test
-    public void testConfInterpreter() throws IOException {
-        ZeppelinServer.notebook.getInterpreterSettingManager().close();
-        Note note = 
ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
-        Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        Map config = p.getConfig();
-        config.put("enabled", true);
-        p.setConfig(config);
-        p.setText("%spark.conf 
spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0");
-        p.setAuthenticationInfo(anonymous);
-        note.run(p.getId());
-        waitForFinish(p);
-        assertEquals(Status.FINISHED, p.getStatus());
-
-        Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
-        p1.setConfig(config);
-        p1.setText("%spark\nimport com.databricks.spark.csv._");
-        p1.setAuthenticationInfo(anonymous);
-        note.run(p1.getId());
-
-        waitForFinish(p1);
-        assertEquals(Status.FINISHED, p1.getStatus());
-
-        ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
-
+    // load dep
+    Paragraph p0 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p0.getConfig();
+    config.put("enabled", true);
+    p0.setConfig(config);
+    p0.setText("%dep z.load(\"com.databricks:spark-csv_2.11:1.2.0\")");
+    p0.setAuthenticationInfo(anonymous);
+    note.run(p0.getId());
+    waitForFinish(p0);
+    assertEquals(Status.FINISHED, p0.getStatus());
+
+    // write test csv file
+    File tmpFile = File.createTempFile("test", "csv");
+    FileUtils.write(tmpFile, "a,b\n1,2");
+
+    // load data using libraries from dep loader
+    Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    p1.setConfig(config);
+
+    String sqlContextName = "sqlContext";
+    if (isSpark2()) {
+      sqlContextName = "spark";
     }
+    p1.setText("%pyspark\n" +
+        "from pyspark.sql import SQLContext\n" +
+        "print(" + sqlContextName + ".read.format('com.databricks.spark.csv')" 
+
+        ".load('" + tmpFile.getAbsolutePath() + "').count())");
+    p1.setAuthenticationInfo(anonymous);
+    note.run(p1.getId());
+
+    waitForFinish(p1);
+    assertEquals(Status.FINISHED, p1.getStatus());
+    assertEquals("2\n", p1.getResult().message().get(0).getData());
+
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  private void verifySparkVersionNumber() throws IOException {
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    note.setName("note");
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%spark print(sc.version)");
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+    assertEquals(sparkVersion, p.getResult().message().get(0).getData());
+  }
+
+  private int toIntSparkVersion(String sparkVersion) {
+    String[] split = sparkVersion.split("\\.");
+    int version = Integer.parseInt(split[0]) * 10 + Integer.parseInt(split[1]);
+    return version;
+  }
+
+  private boolean isSpark2() {
+    return toIntSparkVersion(sparkVersion) >= 20;
+  }
+
+  @Test
+  public void testSparkZeppelinContextDynamicForms() throws IOException {
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    note.setName("note");
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    String code = "%spark.spark println(z.textbox(\"my_input\", 
\"default_name\"))\n" +
+        "println(z.select(\"my_select\", \"1\"," +
+        "Seq((\"1\", \"select_1\"), (\"2\", \"select_2\"))))\n" +
+        "val items=z.checkbox(\"my_checkbox\", Seq(\"2\"), " +
+        "Seq((\"1\", \"check_1\"), (\"2\", \"check_2\")))\n" +
+        "println(items(0))";
+    p.setText(code);
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+
+    assertEquals(Status.FINISHED, p.getStatus());
+    Iterator<String> formIter = p.settings.getForms().keySet().iterator();
+    assert (formIter.next().equals("my_input"));
+    assert (formIter.next().equals("my_select"));
+    assert (formIter.next().equals("my_checkbox"));
+
+    // check dynamic forms values
+    String[] result = p.getResult().message().get(0).getData().split("\n");
+    assertEquals(4, result.length);
+    assertEquals("default_name", result[0]);
+    assertEquals("1", result[1]);
+    assertEquals("items: Seq[Object] = Buffer(2)", result[2]);
+    assertEquals("2", result[3]);
+
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void testPySparkZeppelinContextDynamicForms() throws IOException {
+    Note note = ZeppelinServer.notebook.createNote(anonymous);
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    note.setName("note");
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    String code = "%spark.pyspark print(z.input('my_input', 
'default_name'))\n" +
+        "print(z.select('my_select', " +
+        "[('1', 'select_1'), ('2', 'select_2')], defaultValue='1'))\n" +
+        "items=z.checkbox('my_checkbox', " +
+        "[('1', 'check_1'), ('2', 'check_2')], defaultChecked=['2'])\n" +
+        "print(items[0])";
+    p.setText(code);
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+
+    assertEquals(Status.FINISHED, p.getStatus());
+    Iterator<String> formIter = p.settings.getForms().keySet().iterator();
+    assert (formIter.next().equals("my_input"));
+    assert (formIter.next().equals("my_select"));
+    assert (formIter.next().equals("my_checkbox"));
+
+    // check dynamic forms values
+    String[] result = p.getResult().message().get(0).getData().split("\n");
+    assertEquals(3, result.length);
+    assertEquals("default_name", result[0]);
+    assertEquals("1", result[1]);
+    assertEquals("2", result[2]);
+
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+  }
+
+  @Test
+  public void testConfInterpreter() throws IOException {
+    ZeppelinServer.notebook.getInterpreterSettingManager().close();
+    Note note = 
ZeppelinServer.notebook.createNote(AuthenticationInfo.ANONYMOUS);
+    Paragraph p = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    Map config = p.getConfig();
+    config.put("enabled", true);
+    p.setConfig(config);
+    p.setText("%spark.conf 
spark.jars.packages\tcom.databricks:spark-csv_2.11:1.2.0");
+    p.setAuthenticationInfo(anonymous);
+    note.run(p.getId());
+    waitForFinish(p);
+    assertEquals(Status.FINISHED, p.getStatus());
+
+    Paragraph p1 = note.addNewParagraph(AuthenticationInfo.ANONYMOUS);
+    p1.setConfig(config);
+    p1.setText("%spark\nimport com.databricks.spark.csv._");
+    p1.setAuthenticationInfo(anonymous);
+    note.run(p1.getId());
+
+    waitForFinish(p1);
+    assertEquals(Status.FINISHED, p1.getStatus());
+
+    ZeppelinServer.notebook.removeNote(note.getId(), anonymous);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-server/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/zeppelin-server/src/test/resources/log4j.properties 
b/zeppelin-server/src/test/resources/log4j.properties
index 8368993..aff3b79 100644
--- a/zeppelin-server/src/test/resources/log4j.properties
+++ b/zeppelin-server/src/test/resources/log4j.properties
@@ -42,5 +42,4 @@ log4j.logger.DataNucleus.Datastore=ERROR
 # Log all JDBC parameters
 log4j.logger.org.hibernate.type=ALL
 
-log4j.logger.org.apache.zeppelin.interpreter=DEBUG
-log4j.logger.org.apache.zeppelin.spark=DEBUG
+log4j.logger.org.apache.hadoop=WARN

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-zengine/pom.xml
----------------------------------------------------------------------
diff --git a/zeppelin-zengine/pom.xml b/zeppelin-zengine/pom.xml
index 81ce716..cdb11dd 100644
--- a/zeppelin-zengine/pom.xml
+++ b/zeppelin-zengine/pom.xml
@@ -391,6 +391,20 @@
           </systemProperties>
         </configuration>
       </plugin>
+
+      <!-- publish test jar as well so that zeppelin-server module can use it 
-->
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>3.0.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 

http://git-wip-us.apache.org/repos/asf/zeppelin/blob/d6cdd56d/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
----------------------------------------------------------------------
diff --git 
a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
new file mode 100644
index 0000000..e027bb0
--- /dev/null
+++ 
b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkDownloadUtils.java
@@ -0,0 +1,91 @@
+package org.apache.zeppelin.interpreter;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.net.URL;
+
+public class SparkDownloadUtils {
+  private static Logger LOGGER = 
LoggerFactory.getLogger(SparkDownloadUtils.class);
+
+  private static String downloadFolder = System.getProperty("user.home") + 
"/.cache/spark";
+
+  static {
+    try {
+      FileUtils.forceMkdir(new File(downloadFolder));
+    } catch (IOException e) {
+      throw new RuntimeException("Fail to create downloadFolder: " + 
downloadFolder, e);
+    }
+  }
+
+
+  public static String downloadSpark(String version) {
+    File targetSparkHomeFolder = new File(downloadFolder + "/spark-" + version 
+ "-bin-hadoop2.6");
+    if (targetSparkHomeFolder.exists()) {
+      LOGGER.info("Skip to download spark as it is already downloaded.");
+      return targetSparkHomeFolder.getAbsolutePath();
+    }
+    // Try mirrors a few times until one succeeds
+    for (int i = 0; i < 3; i++) {
+      try {
+        String preferredMirror = IOUtils.toString(new 
URL("https://www.apache.org/dyn/closer.lua?preferred=true";));
+        File downloadFile = new File(downloadFolder + "/spark-" + version + 
"-bin-hadoop2.6.tgz");
+        String downloadURL = preferredMirror + "/spark/spark-" + version + 
"/spark-" + version + "-bin-hadoop2.6.tgz";;
+        runShellCommand(new String[] {"wget", downloadURL, "-P", 
downloadFolder});
+        runShellCommand(new String[]{"tar", "-xvf", 
downloadFile.getAbsolutePath(), "-C", downloadFolder});
+        break;
+      } catch (Exception e) {
+        LOGGER.warn("Failed to download Spark", e);
+      }
+    }
+    return targetSparkHomeFolder.getAbsolutePath();
+  }
+
+  private static void runShellCommand(String[] commands) throws IOException, 
InterruptedException {
+    LOGGER.info("Starting shell commands: " + StringUtils.join(commands, " "));
+    Process process = Runtime.getRuntime().exec(commands);
+    StreamGobbler errorGobbler = new StreamGobbler(process.getErrorStream());
+    StreamGobbler outputGobbler = new StreamGobbler(process.getInputStream());
+    errorGobbler.start();
+    outputGobbler.start();
+    if (process.waitFor() != 0) {
+      throw new IOException("Fail to run shell commands: " + 
StringUtils.join(commands, " "));
+    }
+    LOGGER.info("Complete shell commands: " + StringUtils.join(commands, " "));
+  }
+
+  private static class StreamGobbler extends Thread {
+    InputStream is;
+
+    // reads everything from is until empty.
+    StreamGobbler(InputStream is) {
+      this.is = is;
+    }
+
+    public void run() {
+      try {
+        InputStreamReader isr = new InputStreamReader(is);
+        BufferedReader br = new BufferedReader(isr);
+        String line = null;
+        long startTime = System.currentTimeMillis();
+        while ( (line = br.readLine()) != null) {
+          // logging per 5 seconds
+          if ((System.currentTimeMillis() - startTime) > 5000) {
+            LOGGER.info(line);
+            startTime = System.currentTimeMillis();
+          }
+        }
+      } catch (IOException ioe) {
+        ioe.printStackTrace();
+      }
+    }
+  }
+}

Reply via email to